Commit 7b89141e authored by sean's avatar sean Committed by ilixiaocui
Browse files

curve_ops_tool: fix concurrent check copysets on server

parent f504116f
No related merge requests found
Showing with 123 additions and 44 deletions
+123 -44
......@@ -6,6 +6,8 @@ mdsDummyPort=6667
rpcTimeout=500
# rpc重试次数
rpcRetryTimes=5
# the rpc concurrency to chunkserver
rpcConcurrentNum=10
# etcd地址
etcdAddr=127.0.0.1:2379
# snapshot clone server 地址
......
......@@ -277,6 +277,7 @@ s3_throttle_bpsWriteLimit: 1280
# 运维工具默认值
tool_rpc_timeout: 500
tool_rpc_retry_times: 5
tool_rpc_concurrent_num: 10
# snapshotclone_nginx配置
nginx_docker_internal_port: 80
......
......@@ -12,6 +12,8 @@ mdsDummyPort={{ hostvars[groups.mds[0]].mds_dummy_port }}
rpcTimeout={{ tool_rpc_timeout }}
# rpc重试次数
rpcRetryTimes={{ tool_rpc_retry_times }}
# the rpc concurrency to chunkserver
rpcConcurrentNum={{ tool_rpc_concurrent_num }}
# etcd地址
{% set etcd_address=[] -%}
{% for host in groups.etcd -%}
......
......@@ -1012,7 +1012,7 @@ void ScanChunkRequest::BuildAndSendScanMap(const ChunkRequest &request,
request.sendscanmapretryintervalus(),
cntl, channel);
LOG(INFO) << "logid = " << cntl->log_id()
<< "Sending scanmap: " << scanMap->ShortDebugString()
<< " Sending scanmap: " << scanMap->ShortDebugString()
<< " to leader: " << peer_.addr;
stub.FollowScanMap(cntl, scanMapRequest, scanMapResponse, done);
}
......
......@@ -148,12 +148,21 @@ int CopysetCheckCore::CheckCopysetsOnChunkServer(
}
ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
const std::string& chunkserverAddr,
const std::set<std::string>& groupIds,
bool queryLeader) {
const std::string& chunkserverAddr,
const std::set<std::string>& groupIds,
bool queryLeader,
std::pair<int, butil::IOBuf> *record,
bool queryCs) {
bool isHealthy = true;
int res = 0;
butil::IOBuf iobuf;
int res = QueryChunkServer(chunkserverAddr, &iobuf);
if (queryCs) {
res = QueryChunkServer(chunkserverAddr, &iobuf);
} else {
res = record->first;
iobuf = record->second;
}
if (res != 0) {
// 如果查询chunkserver失败,认为不在线,把它上面所有的
// copyset都添加到peerNotOnlineCopysets_里面
......@@ -244,16 +253,18 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
isHealthy = false;
}
}
// 遍历没有leader的copyset
bool health = CheckCopysetsNoLeader(chunkserverAddr,
noLeaderCopysetsPeers);
if (!health) {
isHealthy = false;
}
// 遍历chunkserver发送请求
for (const auto& item : csAddrMap) {
ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(item.first,
item.second);
item.second);
if (res != ChunkServerHealthStatus::kHealthy) {
isHealthy = false;
}
......@@ -266,9 +277,9 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
}
bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr,
const std::map<std::string,
std::vector<std::string>>&
copysetsPeers) {
const std::map<std::string,
std::vector<std::string>>&
copysetsPeers) {
if (copysetsPeers.empty()) {
return true;
}
......@@ -351,9 +362,31 @@ int CopysetCheckCore::CheckCopysetsOnServer(const std::string& serverIp,
return CheckCopysetsOnServer(0, serverIp, true, unhealthyChunkServers);
}
void CopysetCheckCore::ConcurrentCheckCopysetsOnServer(
const std::vector<ChunkServerInfo> &chunkservers,
uint32_t *index, std::map<std::string,
std::pair<int, butil::IOBuf>> *result) {
while (1) {
indexMutex.lock();
if (*index + 1 > chunkservers.size()) {
indexMutex.unlock();
break;
}
auto info = chunkservers[*index];
(*index)++;
indexMutex.unlock();
std::string csAddr = info.hostip() + ":" + std::to_string(info.port());
butil::IOBuf iobuf;
int res = QueryChunkServer(csAddr, &iobuf);
mapMutex.lock();
result->emplace(csAddr, std::make_pair(res, iobuf));
mapMutex.unlock();
}
}
int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId,
const std::string& serverIp,
bool queryLeader,
const std::string& serverIp, bool queryLeader,
std::vector<std::string>* unhealthyChunkServers) {
bool isHealthy = true;
// 向mds发送RPC
......@@ -368,19 +401,31 @@ int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId,
std::cout << "ListChunkServersOnServer fail!" << std::endl;
return -1;
}
for (const auto& info : chunkservers) {
std::string ip = info.hostip();
uint64_t port = info.port();
std::string csAddr = ip + ":" + std::to_string(port);
ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(csAddr,
{}, queryLeader);
std::vector<Thread> threadpool;
std::map<std::string, std::pair<int, butil::IOBuf>> queryCsResult;
uint32_t index = 0;
for (int i = 0; i < FLAGS_rpcConcurrentNum; i++) {
threadpool.emplace_back(Thread(
&CopysetCheckCore::ConcurrentCheckCopysetsOnServer,
this, std::ref(chunkservers), &index,
&queryCsResult));
}
for (auto &thread : threadpool) {
thread.join();
}
for (auto &record : queryCsResult) {
std::string chunkserverAddr = record.first;
auto res = CheckCopysetsOnChunkServer(chunkserverAddr, {}, queryLeader,
&record.second, false);
if (res != ChunkServerHealthStatus::kHealthy) {
isHealthy = false;
if (unhealthyChunkServers) {
unhealthyChunkServers->emplace_back(csAddr);
unhealthyChunkServers->emplace_back(chunkserverAddr);
}
}
}
if (isHealthy) {
return 0;
} else {
......@@ -617,12 +662,15 @@ void CopysetCheckCore::ParseResponseAttachment(
int CopysetCheckCore::QueryChunkServer(const std::string& chunkserverAddr,
butil::IOBuf* iobuf) {
int res = csClient_->Init(chunkserverAddr);
// unit test will set csClient_ to mock
auto csClient = (csClient_ == nullptr) ?
std::make_shared<ChunkServerClient>() : csClient_;
int res = csClient->Init(chunkserverAddr);
if (res != 0) {
std::cout << "Init chunkserverClient fail!" << std::endl;
return -1;
}
return csClient_->GetRaftStatus(iobuf);
return csClient->GetRaftStatus(iobuf);
}
void CopysetCheckCore::UpdateChunkServerCopysets(
......@@ -638,13 +686,15 @@ void CopysetCheckCore::UpdateChunkServerCopysets(
// 通过发送RPC检查chunkserver是否在线
bool CopysetCheckCore::CheckChunkServerOnline(
const std::string& chunkserverAddr) {
int res = csClient_->Init(chunkserverAddr);
auto csClient = (csClient_ == nullptr) ?
std::make_shared<ChunkServerClient>() : csClient_;
int res = csClient->Init(chunkserverAddr);
if (res != 0) {
std::cout << "Init chunkserverClient fail!" << std::endl;
chunkserverCopysets_[chunkserverAddr] = {};
return false;
}
bool online = csClient_->CheckChunkServerOnline();
bool online = csClient->CheckChunkServerOnline();
if (!online) {
chunkserverCopysets_[chunkserverAddr] = {};
}
......
......@@ -43,6 +43,7 @@
#include "src/tools/metric_name.h"
#include "src/tools/curve_tool_define.h"
#include "include/chunkserver/chunkserver_common.h"
#include "src/common/concurrent/concurrent.h"
using curve::mds::topology::PoolIdType;
using curve::mds::topology::CopySetIdType;
......@@ -54,6 +55,8 @@ using curve::mds::topology::ChunkServerStatus;
using curve::chunkserver::ToGroupId;
using curve::chunkserver::GetPoolID;
using curve::chunkserver::GetCopysetID;
using curve::common::Mutex;
using curve::common::Thread;
namespace curve {
namespace tool {
......@@ -106,8 +109,8 @@ const char kThreeCopiesInconsistent[] = "Three copies inconsistent";
class CopysetCheckCore {
public:
CopysetCheckCore(std::shared_ptr<MDSClient> mdsClient,
std::shared_ptr<ChunkServerClient> csClient) :
mdsClient_(mdsClient), csClient_(csClient) {}
std::shared_ptr<ChunkServerClient> csClient = nullptr) :
mdsClient_(mdsClient), csClient_(csClient) {}
virtual ~CopysetCheckCore() = default;
/**
......@@ -283,19 +286,23 @@ class CopysetCheckCore {
const std::string& chunkserverAddr);
/**
* @brief 检查某个chunkserver上的copyset的健康状态
* @brief check copysets' healthy status on chunkserver
*
* @param chunkserAddr chunkserver的地址
* @param groupIds 要检查的复制组的groupId,默认为空,全部检查
* @param queryLeader 是否向leader所在的chunkserver发送RPC查询,
* 对于检查cluster来说,所有chunkserver都会遍历到,不用查询
* @param[in] chunkserAddr: chunkserver address
* @param[in] groupIds: groupId for check, default is null, check all the copysets
* @param[in] queryLeader: whether send rpc to chunkserver which copyset leader on.
* All the chunkserves will be check when check clusters status.
* @param[in] record: raft state rpc response from chunkserver
* @param[in] queryCs: whether send rpc to chunkserver
*
* @return 返回错误码
* @return error code
*/
ChunkServerHealthStatus CheckCopysetsOnChunkServer(
const std::string& chunkserverAddr,
const std::set<std::string>& groupIds,
bool queryLeader = true);
const std::string& chunkserverAddr,
const std::set<std::string>& groupIds,
bool queryLeader = true,
std::pair<int, butil::IOBuf> *record = nullptr,
bool queryCs = true);
/**
* @brief 检查某个server上的所有copyset的健康状态
......@@ -312,6 +319,17 @@ class CopysetCheckCore {
bool queryLeader = true,
std::vector<std::string>* unhealthyChunkServers = nullptr);
/**
* @brief concurrent check copyset on server
* @param[in] chunkservers: chunkservers on server
* @param[in] index: the deal index of chunkserver
* @param[in] result: rpc response from chunkserver
*/
void ConcurrentCheckCopysetsOnServer(
const std::vector<ChunkServerInfo> &chunkservers,
uint32_t *index,
std::map<std::string, std::pair<int, butil::IOBuf>> *result);
/**
* @brief 根据leader的map里面的copyset信息分析出copyset是否健康,健康返回0,否则
* 否则返回错误码
......@@ -414,7 +432,7 @@ class CopysetCheckCore {
// 向mds发送RPC的client
std::shared_ptr<MDSClient> mdsClient_;
// 向chunkserver发送RPC的client
// for unittest mock csClient
std::shared_ptr<ChunkServerClient> csClient_;
// 保存copyset的信息
......@@ -431,6 +449,11 @@ class CopysetCheckCore {
std::string copysetsDetail_;
const std::string kEmptyAddr = "0.0.0.0:0:0";
// mutex for concurrent rpc to chunkserver
Mutex indexMutex;
Mutex vectorMutex;
Mutex mapMutex;
};
} // namespace tool
......
......@@ -31,6 +31,7 @@ DEFINE_string(mdsDummyPort, "6667", "dummy port of mds, "
DEFINE_string(etcdAddr, "127.0.0.1:2379", "etcd addr");
DEFINE_uint64(rpcTimeout, 3000, "millisecond for rpc timeout");
DEFINE_uint64(rpcRetryTimes, 5, "rpc retry times");
DEFINE_uint64(rpcConcurrentNum, 10, "rpc concurrent number to chunkserver");
DEFINE_string(snapshotCloneAddr, "127.0.0.1:5555", "snapshot clone addr");
DEFINE_string(snapshotCloneDummyPort, "8081", "dummy port of snapshot clone, "
"can specify one or several. "
......
......@@ -31,6 +31,7 @@ DECLARE_string(mdsDummyPort);
DECLARE_string(etcdAddr);
DECLARE_uint64(rpcTimeout);
DECLARE_uint64(rpcRetryTimes);
DECLARE_uint64(rpcConcurrentNum);
DECLARE_string(snapshotCloneAddr);
DECLARE_string(snapshotCloneDummyPort);
DECLARE_uint64(chunkSize);
......
......@@ -49,9 +49,7 @@ std::shared_ptr<CurveTool> CurveToolFactory::GenerateCurveTool(
std::shared_ptr<StatusTool> CurveToolFactory::GenerateStatusTool() {
auto mdsClient = std::make_shared<MDSClient>();
auto etcdClient = std::make_shared<EtcdClient>();
auto csClient = std::make_shared<ChunkServerClient>();
auto copysetCheck =
std::make_shared<CopysetCheckCore>(mdsClient, csClient);
auto copysetCheck = std::make_shared<CopysetCheckCore>(mdsClient);
auto metricClient = std::make_shared<MetricClient>();
auto snapshotCloneClient =
std::make_shared<SnapshotCloneClient>(metricClient);
......@@ -83,9 +81,7 @@ std::shared_ptr<CurveCli> CurveToolFactory::GenerateCurveCli() {
std::shared_ptr<CopysetCheck> CurveToolFactory::GenerateCopysetCheck() {
auto mdsClient = std::make_shared<MDSClient>();
auto csClient = std::make_shared<ChunkServerClient>();
auto core = std::make_shared<curve::tool::CopysetCheckCore>(mdsClient,
csClient);
auto core = std::make_shared<curve::tool::CopysetCheckCore>(mdsClient);
return std::make_shared<CopysetCheck>(core);
}
......@@ -96,9 +92,8 @@ std::shared_ptr<ScheduleTool> CurveToolFactory::GenerateScheduleTool() {
std::shared_ptr<CopysetTool> CurveToolFactory::GenerateCopysetTool() {
auto mdsClient = std::make_shared<MDSClient>();
auto csClient = std::make_shared<ChunkServerClient>();
auto copysetCheck =
std::make_shared<curve::tool::CopysetCheckCore>(mdsClient, csClient);
std::make_shared<curve::tool::CopysetCheckCore>(mdsClient);
return std::make_shared<CopysetTool>(copysetCheck, mdsClient);
}
......
......@@ -88,6 +88,10 @@ void UpdateFlagsFromConf(curve::common::Configuration* conf) {
if (GetCommandLineFlagInfo("rpcRetryTimes", &info) && info.is_default) {
conf->GetUInt64Value("rpcRetryTimes", &FLAGS_rpcRetryTimes);
}
if (GetCommandLineFlagInfo("rpcConcurrentNum", &info) &&
info.is_default) {
conf->GetUInt64Value("rpcConcurrentNum", &FLAGS_rpcConcurrentNum);
}
if (GetCommandLineFlagInfo("snapshotCloneAddr", &info) &&
info.is_default) {
conf->GetStringValue("snapshotCloneAddr", &FLAGS_snapshotCloneAddr);
......
......@@ -946,10 +946,10 @@ int StatusTool::PrintChunkserverStatus(bool checkLeftSize) {
if (useWalPool && !useChunkFilePoolAsWalPool) {
PrintCsLeftSizeStatistics("walfilepool", poolWalSegmentLeftSize);
} else if (useChunkFilePoolAsWalPool) {
std::cout << "No walpool left size fount, "
std::cout << "No walpool left size found, "
<< "use chunkfilepool as walpool!\n";
} else {
std::cout << "No walpool left size fount, "
std::cout << "No walpool left size found, "
<< "no walpool used!\n";
}
return ret;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment