Commit 23051691 authored by xuchaojie's avatar xuchaojie
Browse files

add topology and copyset code

Change-Id: I01f2b27bc5424ed4566a4aa83851e6111c0d6744
parent 817cb92e
Showing with 1715 additions and 14 deletions
+1715 -14
......@@ -65,3 +65,6 @@ bazel-testlogs
# vscode
.vscode
# vimprj
.vimprj/
BUILD 0 → 100644
......@@ -12,4 +12,3 @@ cc_library(
],
visibility = ["//visibility:public"],
)
copts.bzl 0 → 100644
GCC_FLAGS = [
"-pthread",
"-std=c++11",
# "-Werror",
# "-Wsign-compare",
# "-Wno-unused-parameter",
# "-Wno-unused-variable",
# "-Woverloaded-virtual",
# "-Wnon-virtual-dtor",
# "-Wno-missing-field-initializers",
]
GCC_TEST_FLAGS = [
"-pthread",
"-std=c++11",
# "-Werror",
# "-Wsign-compare",
# "-Wno-unused-parameter",
# "-Wno-unused-variable",
# "-Woverloaded-virtual",
# "-Wnon-virtual-dtor",
# "-Wno-missing-field-initializers",
]
......@@ -6,8 +6,18 @@ cc_proto_library (
proto_library(
name = "nameserver2_proto",
srcs = ["nameserver2.proto",
"topology.proto",],
srcs = ["nameserver2.proto"],
)
cc_proto_library(
name = "topology_cc_proto",
deps = [":topology_proto"],
visibility = ["//visibility:public"],
)
proto_library(
name = "topology_proto",
srcs = ["topology.proto"],
)
cc_proto_library(
......@@ -26,5 +36,6 @@ proto_library(
"configuration.proto",
"chunkserver.proto",
]),
visibility = ["//visibility:public"],
)
......@@ -37,6 +37,7 @@ cc_library(
"//include/client:include_client",
"//include:include-common",
"//proto:nameserver2_cc_proto",
"//proto:topology_cc_proto",
"//proto:chunkserver-cc-protos",
# "//src/chunkserver:chunkserver-lib"
],
......@@ -94,4 +95,4 @@ cc_library(
"//external:gflags",
"//external:protobuf",
],
)
\ No newline at end of file
)
load(
"//:copts.bzl",
"GCC_FLAGS",
"GCC_TEST_FLAGS"
)
cc_library(
name = "mds_common",
srcs = glob(["*.cpp", "*.h"]),
copts = GCC_FLAGS,
visibility = ["//visibility:public"],
)
/*
* Project: curve
* Created Date: Fri Aug 17 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#ifndef CURVE_SRC_MDS_COMMON_TOPOLOGY_DEFINE_H_
#define CURVE_SRC_MDS_COMMON_TOPOLOGY_DEFINE_H_
namespace curve {
namespace mds {
namespace topology {
typedef uint16_t PoolIdType;
typedef uint32_t ZoneIdType;
typedef uint32_t ServerIdType;
typedef uint32_t ChunkServerIdType;
typedef uint32_t UserIdType;
typedef uint32_t CopySetIdType;
// TODO(xuchaojie): 修改为从配置文件读取的数据
const uint64_t kChunkServerStateUpdateFreq = 600;
// topology Error Code
const int kTopoErrCodeSuccess = 0;
const int kTopoErrCodeInternalError = -1;
const int kTopoErrCodeInvalidParam = -2;
const int kTopoErrCodeInitFail = -3;
const int kTopoErrCodeStorgeFail = -4;
const int kTopoErrCodeIdDuplicated = -5;
const int kTopoErrCodeChunkServerNotFound = -6;
const int kTopoErrCodeServerNotFound = -7;
const int kTopoErrCodeZoneNotFound = -8;
const int kTopoErrCodePhysicalPoolNotFound = -9;
const int kTopoErrCodeLogicalPoolNotFound = -10;
const int kTopoErrCodeCopySetNotFound = -11;
const int kTopoErrCodeGenCopysetErr = -12;
const int kTopoErrCodeAllocateIdFail = -13;
} // namespace topology
} // namespace mds
} // namespace curve
#endif // CURVE_SRC_MDS_COMMON_TOPOLOGY_DEFINE_H_
load(
"//:copts.bzl",
"GCC_FLAGS",
"GCC_TEST_FLAGS"
)
cc_library(
name = "copyset",
srcs = glob(["*.cpp", "*.h"]),
deps = ["//external:glog", "//src/mds/common:mds_common"],
copts = GCC_FLAGS,
visibility = ["//visibility:public"],
)
/*
* Project:
* Created Date: Wed Oct 10 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#include "src/mds/copyset/copyset_manager.h"
namespace curve {
namespace mds {
namespace copyset {
std::shared_ptr<CopysetPolicy> CopysetManager::GetCopysetPolicy(
uint32_t zoneNum,
uint32_t zoneChoseNum,
uint32_t replicaNum) const {
if ((3 == zoneNum) &&
(3 == zoneChoseNum) &&
(3 == replicaNum)) {
return std::make_shared<CopysetZoneShufflePolicy>(
std::make_shared<CopysetPermutationPolicy333>());
} else {
return nullptr;
}
}
} // namespace copyset
} // namespace mds
} // namespace curve
/*
* Project:
* Created Date: Wed Oct 10 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#ifndef CURVE_SRC_MDS_COPYSET_COPYSET_MANAGER_H_
#define CURVE_SRC_MDS_COPYSET_COPYSET_MANAGER_H_
#include <memory>
#include "src/mds/copyset/copyset_policy.h"
namespace curve {
namespace mds {
namespace copyset {
class CopysetManager {
public:
CopysetManager() {}
~CopysetManager() {}
std::shared_ptr<CopysetPolicy> GetCopysetPolicy(uint32_t zoneNum,
uint32_t zoneChoseNum,
uint32_t replicaNum) const;
};
} // namespace copyset
} // namespace mds
} // namespace curve
#endif // CURVE_SRC_MDS_COPYSET_COPYSET_MANAGER_H_
/*
* Project: curve
* Created Date: Thu Oct 11 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#include "src/mds/copyset/copyset_policy.h"
#include <glog/logging.h>
#include <algorithm>
#include <cassert>
#include <cmath>
#include <iostream>
#include <sstream>
#include <iterator>
#include <set>
#include <unordered_set>
#include <random>
#include <utility>
namespace curve {
namespace mds {
namespace copyset {
using ::curve::mds::topology::ChunkServerIdType;
bool operator<(const Copyset& lhs, const Copyset& rhs) {
std::ostringstream buf1;
std::copy(lhs.replicas.begin(),
lhs.replicas.end(),
std::ostream_iterator<ChunkServerIdType>(buf1, "_"));
std::ostringstream buf2;
std::copy(rhs.replicas.begin(),
rhs.replicas.end(),
std::ostream_iterator<ChunkServerIdType>(buf2, "_"));
return buf1.str() < buf2.str();
}
std::ostream& operator<<(std::ostream& out, const Copyset& rhs) {
out << "{copyset: ";
std::copy(rhs.replicas.begin(),
rhs.replicas.end(),
std::ostream_iterator<ChunkServerIdType>(out, " "));
return out << "}";
}
std::ostream& operator<<(std::ostream& out, const ChunkServerInfo& rhs) {
return out << "{server:"
<< rhs.id
<< ",zone:"
<< rhs.location.zoneId
<< "}";
}
bool CopysetZoneShufflePolicy::GenCopyset(const ClusterInfo& cluster,
int numCopysets,
std::vector<Copyset>* out) {
std::vector<ChunkServerInfo> chunkServers = cluster.GetChunkServerInfo();
uint32_t numReplicas = permutationPolicy_->GetReplicaNum();
std::vector<ChunkServerInfo> replicas(numReplicas);
std::set<Copyset> unique_set;
int maxNum = GetMaxPermutationNum(numCopysets,
chunkServers.size(),
numReplicas);
for (int i = 0; i < maxNum; i++) {
if (!permutationPolicy_->permutation(&chunkServers)) {
return false;
}
for (int i = 0; i < chunkServers.size(); i += numReplicas) {
if (i + numReplicas > chunkServers.size()) {
break;
}
std::copy(chunkServers.begin() + i,
chunkServers.begin() + i + numReplicas,
replicas.begin());
Copyset copyset;
for (auto& replica : replicas) {
copyset.replicas.insert(replica.id);
}
if (unique_set.count(copyset) > 0) {
continue;
}
unique_set.insert(copyset);
out->emplace_back(copyset);
if (--numCopysets == 0) {
LOG(INFO) << "Generate copyset success : ";
for (Copyset& copyset : *out) {
LOG(INFO) << copyset << " ";
}
return true;
}
}
}
return false;
}
bool CopysetZoneShufflePolicy::AddNode(const ClusterInfo& cluster,
const ChunkServerInfo& node,
int scatterWidth,
std::vector<Copyset>* out) {
// TODO(xuchaojie): fix this
return true;
}
bool CopysetZoneShufflePolicy::RemoveNode(const ClusterInfo& cluster,
const ChunkServerInfo& node,
const std::vector<Copyset>& css,
std::vector<std::pair<Copyset, Copyset>>* out) {
// TODO(xuchaojie): fix this
return true;
}
int CopysetZoneShufflePolicy::GetMaxPermutationNum(int numCopysets,
int num_servers,
int num_replicas) {
return numCopysets;
}
bool CopysetPermutationPolicy333::permutation(
std::vector<ChunkServerInfo> *chunkServers) {
std::vector<ChunkServerInfo> chunkServerInZone1,
chunkServerInZone2,
chunkServerInZone3;
curve::mds::topology::ZoneIdType zid1, zid2, zid3;
for (ChunkServerInfo sv : *chunkServers) {
if (0 == chunkServerInZone1.size() ||
zid1 == sv.location.zoneId) {
chunkServerInZone1.push_back(sv);
zid1 = sv.location.zoneId;
} else if (0 == chunkServerInZone2.size() ||
zid2 == sv.location.zoneId) {
chunkServerInZone2.push_back(sv);
zid2 = sv.location.zoneId;
} else if (0 == chunkServerInZone3.size() ||
zid3 == sv.location.zoneId) {
chunkServerInZone3.push_back(sv);
zid3 = sv.location.zoneId;
} else {
// more than 3 zones, add err log
LOG(ERROR) << "[CopysetPermutationPolicy333::permutation]: "
<< "error, cluster has more than 3 zones.";
return false;
}
}
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(chunkServerInZone1.begin(), chunkServerInZone1.end(), g);
std::shuffle(chunkServerInZone2.begin(), chunkServerInZone2.end(), g);
std::shuffle(chunkServerInZone3.begin(), chunkServerInZone3.end(), g);
chunkServers->clear();
for (int i = 0; i < chunkServerInZone1.size() &&
i < chunkServerInZone2.size() &&
i < chunkServerInZone3.size();
i++) {
if (i < chunkServerInZone1.size()) {
chunkServers->push_back(chunkServerInZone1[i]);
}
if (i < chunkServerInZone2.size()) {
chunkServers->push_back(chunkServerInZone2[i]);
}
if (i < chunkServerInZone3.size()) {
chunkServers->push_back(chunkServerInZone3[i]);
}
}
return true;
}
} // namespace copyset
} // namespace mds
} // namespace curve
/*
* Project: curve
* Created Date: Thu Oct 11 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#ifndef CURVE_SRC_MDS_COPYSET_COPYSET_POLICY_H_
#define CURVE_SRC_MDS_COPYSET_COPYSET_POLICY_H_
#include <set>
#include <vector>
#include <memory>
#include <iostream>
#include <cstdint>
#include <iterator>
#include "src/mds/common/topology_define.h"
namespace curve {
namespace mds {
namespace copyset {
struct Copyset {
std::set<curve::mds::topology::ChunkServerIdType> replicas;
};
bool operator<(const Copyset& lhs, const Copyset& rhs);
std::ostream& operator<<(std::ostream& out, const Copyset& rhs);
struct ChunkServerLocation {
curve::mds::topology::ZoneIdType zoneId;
curve::mds::topology::PoolIdType logicalPoolId;
};
// ChunkServerInfo represents a chunkserver
struct ChunkServerInfo {
curve::mds::topology::ChunkServerIdType id;
ChunkServerLocation location;
};
// for logging
std::ostream& operator<<(std::ostream& out, const ChunkServerInfo& rhs);
class ClusterInfo {
public:
ClusterInfo() {}
virtual ~ClusterInfo() {}
bool GetChunkServerInfo(curve::mds::topology::ChunkServerIdType id,
ChunkServerInfo* out) const {
for (auto& server : csInfo_) {
if (server.id == id) {
*out = server;
return true;
}
}
return false;
}
void AddChunkServerInfo(const ChunkServerInfo &info) {
csInfo_.push_back(info);
}
std::vector<ChunkServerInfo> GetChunkServerInfo() const {
return csInfo_;
}
protected:
std::vector<ChunkServerInfo> csInfo_;
};
class CopysetPolicy {
public:
CopysetPolicy() {}
virtual ~CopysetPolicy() {}
// GenCopyset generate some copysets for a cluster
// return: if succeed return true
virtual bool GenCopyset(const ClusterInfo& cluster,
int numCopysets,
std::vector<Copyset>* out) = 0;
// new node
virtual bool AddNode(const ClusterInfo& cluster,
const ChunkServerInfo& node,
int scatterWidth,
std::vector<Copyset>* out) = 0;
// RemoveNode when node offline, change all copysets cover this node,
// replace origin node with a random node
virtual bool RemoveNode(const ClusterInfo& cluster,
const ChunkServerInfo& node,
const std::vector<Copyset>& css,
std::vector<std::pair<Copyset, Copyset>>* out) = 0;
// Get Max permutation num, make sure the permutation will stop finally
virtual int GetMaxPermutationNum(int numCopysets,
int num_servers,
int num_replicas) = 0;
};
class CopysetPermutationPolicy {
public:
CopysetPermutationPolicy()
: zoneNum_(0),
zoneChoseNum_(0),
replicaNum_(0) {};
CopysetPermutationPolicy(uint32_t zoneNum,
uint32_t zoneChoseNum,
uint32_t replicaNum)
: zoneNum_(zoneNum),
zoneChoseNum_(zoneChoseNum),
replicaNum_(replicaNum) {};
virtual ~CopysetPermutationPolicy(){};
virtual bool permutation(std::vector<ChunkServerInfo> *servers) = 0;
uint32_t GetZoneNum() const {
return zoneNum_;
}
uint32_t GetZoneChosenNum() const {
return zoneChoseNum_;
}
uint32_t GetReplicaNum() const {
return replicaNum_;
}
protected:
const uint32_t zoneNum_;
const uint32_t zoneChoseNum_;
const uint32_t replicaNum_;
};
class CopysetZoneShufflePolicy : public CopysetPolicy {
public:
CopysetZoneShufflePolicy(
std::shared_ptr<CopysetPermutationPolicy> permutationPolicy)
: permutationPolicy_(permutationPolicy) {}
virtual ~CopysetZoneShufflePolicy() {}
// GenCopyset generate some copysets for a cluster
// return: if succeed return true
virtual bool GenCopyset(const ClusterInfo& cluster,
int numCopysets,
std::vector<Copyset>* out);
// new node
virtual bool AddNode(const ClusterInfo& cluster,
const ChunkServerInfo& node,
int scatterWidth,
std::vector<Copyset>* out);
// RemoveNode when node offline, change all copysets cover this node,
// replace origin node with a random node
virtual bool RemoveNode(const ClusterInfo& cluster,
const ChunkServerInfo& node,
const std::vector<Copyset>& css,
std::vector<std::pair<Copyset, Copyset>>* out);
// Get Max permutation num, make sure the permutation will stop finally
virtual int GetMaxPermutationNum(int numCopysets,
int num_servers,
int num_replicas) override;
private:
std::shared_ptr<CopysetPermutationPolicy> permutationPolicy_;
};
class CopysetPermutationPolicy333 : public CopysetPermutationPolicy {
public:
CopysetPermutationPolicy333()
: CopysetPermutationPolicy(3,3,3){}
~CopysetPermutationPolicy333() {}
virtual bool permutation(std::vector<ChunkServerInfo> *servers);
};
} // namespace copyset
} // namespace mds
} // namespace curve
#endif // CURVE_SRC_MDS_COPYSET_COPYSET_POLICY_H_
......@@ -14,6 +14,7 @@
#include "src/mds/nameserver2/inode_id_generator.h"
#include "src/mds/nameserver2/chunk_id_generator.h"
#include "src/mds/topology/topology_admin.h"
#include "src/mds/common/topology_define.h"
using ::curve::mds::topology::TopologyAdmin;
......
cc_library(
name="topology",
hdrs=glob(["*.h"]),
srcs=glob(["*.cpp"]),
visibility = ["//visibility:public"],
)
load(
"//:copts.bzl",
"GCC_FLAGS",
"GCC_TEST_FLAGS"
)
cc_library (
name = "topology",
srcs = glob(["*.cpp", "*.h"]),
deps = ["//external:glog",
"//external:brpc",
"//external:json",
"//proto:topology_cc_proto",
"//src/mds/copyset:copyset",
"//src/mds/common:mds_common",
"//src/repo:repo",
"//proto:chunkserver-cc-protos",
"//proto:nameserver2_cc_proto"],
copts = GCC_FLAGS,
linkopts = [
"-lmysqlcppconn",
],
visibility = ["//visibility:public"],
)
/*
* Project: curve
* Created Date: Mon Aug 27 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#ifndef CURVE_SRC_MDS_TOPOLOGY_SINGLETON_H_
#define CURVE_SRC_MDS_TOPOLOGY_SINGLETON_H_
#include <mutex>
#include <memory>
template <class T>
class Singleton {
protected:
Singleton() {}
private:
Singleton(const Singleton&) = default;
Singleton& operator=(const Singleton&) = default;
static std::shared_ptr<T> m_instance;
static pthread_mutex_t mutex;
public:
virtual ~Singleton() {}
static std::shared_ptr<T> GetInstance();
};
template <class T>
std::shared_ptr<T> Singleton<T>::GetInstance() {
if (m_instance == nullptr) {
pthread_mutex_lock(&mutex);
if (m_instance == nullptr) {
m_instance = std::shared_ptr<T>(new T());
}
pthread_mutex_unlock(&mutex);
}
return m_instance;
}
template <class T>
pthread_mutex_t Singleton<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
template <class T>
std::shared_ptr<T> Singleton<T>::m_instance = nullptr;
#endif // CURVE_SRC_MDS_TOPOLOGY_SINGLETON_H_
This diff is collapsed.
/*
* Project: curve
* Created Date: Fri Aug 17 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#ifndef CURVE_SRC_MDS_TOPOLOGY_TOPOLOGY_H_
#define CURVE_SRC_MDS_TOPOLOGY_TOPOLOGY_H_
#include <unordered_map>
#include <string>
#include <list>
#include "proto/topology.pb.h"
#include "src/mds/common/topology_define.h"
#include "src/mds/topology/topology_item.h"
#include "src/mds/topology/topology_id_generator.h"
#include "src/mds/topology/topology_token_generator.h"
#include "src/mds/topology/topology_storge.h"
// TODO(xuchaojie): 增加curve::common::mutex的头文件,以替换下面代码
#include <mutex>
namespace curve {
namespace common {
using mutex = std::mutex;
}
}
namespace curve {
namespace mds {
namespace topology {
class Topology {
public:
Topology(std::shared_ptr<TopologyIdGenerator> idGenerator,
std::shared_ptr<TopologyTokenGenerator> tokenGenerator,
std::shared_ptr<TopologyStorage> storage)
: idGenerator_(idGenerator),
tokenGenerator_(tokenGenerator),
storage_(storage) {
}
~Topology() {}
//从持久化设备加载数据
int init();
// allocate id & token
PoolIdType AllocateLogicalPoolId();
PoolIdType AllocatePhysicalPoolId();
ZoneIdType AllocateZoneId();
ServerIdType AllocateServerId();
ChunkServerIdType AllocateChunkServerId();
CopySetIdType AllocateCopySetId(PoolIdType logicalPoolId);
std::string AllocateToken();
// add
int AddLogicalPool(const LogicalPool &data);
int AddPhysicalPool(const PhysicalPool &data);
int AddZone(const Zone &data);
int AddServer(const Server &data);
int AddChunkServer(const ChunkServer &data);
int AddCopySet(const CopySetInfo &data);
// remove
int RemoveLogicalPool(PoolIdType id);
int RemovePhysicalPool(PoolIdType id);
int RemoveZone(ZoneIdType id);
int RemoveServer(ServerIdType id);
int RemoveChunkServer(ChunkServerIdType id);
int RemoveCopySet(CopySetKey key);
// update
int UpdateLogicalPool(const LogicalPool &data);
int UpdatePhysicalPool(const PhysicalPool &data);
int UpdateZone(const Zone &data);
int UpdateServer(const Server &data);
// 更新内存并持久化全部数据
int UpdateChunkServer(const ChunkServer &data);
// 更新内存,定期持久化数据
int UpdateChunkServerState(const ChunkServerState &state,
ChunkServerIdType id);
int UpdateCopySet(const CopySetInfo &data);
// find
PoolIdType FindLogicalPool(const std::string &logicalPoolName,
const std::string &physicalPoolName) const;
PoolIdType FindPhysicalPool(const std::string &physicalPoolName) const;
ZoneIdType FindZone(const std::string &zoneName,
const std::string &physicalPoolName) const;
ZoneIdType FindZone(const std::string &zoneName,
PoolIdType physicalpoolid) const;
ServerIdType FindServerByHostName(const std::string &hostName) const;
ServerIdType FindServerByHostIp(const std::string &hostIp) const;
ChunkServerIdType FindChunkServer(const std::string &hostIp,
uint32_t port) const;
// get
bool GetLogicalPool(PoolIdType poolId, LogicalPool *out) const;
bool GetPhysicalPool(PoolIdType poolId, PhysicalPool *out) const;
bool GetZone(ZoneIdType zoneId, Zone *out) const;
bool GetServer(ServerIdType serverId, Server *out) const;
bool GetChunkServer(ChunkServerIdType chunkserverId,
ChunkServer *out) const;
bool GetCopySet(CopySetKey key, CopySetInfo *out);
bool GetLogicalPool(const std::string &logicalPoolName,
const std::string &physicalPoolName,
LogicalPool *out) const {
return GetLogicalPool(
FindLogicalPool(logicalPoolName, physicalPoolName), out);
}
bool GetPhysicalPool(const std::string &physicalPoolName,
PhysicalPool *out) const {
return GetPhysicalPool(FindPhysicalPool(physicalPoolName), out);
}
bool GetZone(const std::string &zoneName,
const std::string &physicalPoolName,
Zone *out) const {
return GetZone(FindZone(zoneName, physicalPoolName), out);
}
bool GetZone(const std::string &zoneName,
PoolIdType physicalPoolId,
Zone *out) const {
return GetZone(FindZone(zoneName, physicalPoolId), out);
}
bool GetServerByHostName(const std::string &hostName,
Server *out) const {
return GetServer(FindServerByHostName(hostName), out);
}
bool GetServerByHostIp(const std::string &hostIp,
Server *out) const {
return GetServer(FindServerByHostIp(hostIp), out);
}
bool GetChunkServer(const std::string &hostIp,
uint32_t port,
ChunkServer *out) const {
return GetChunkServer(FindChunkServer(hostIp, port), out);
}
// getlist
std::list<ChunkServerIdType> GetChunkServerInCluster() const;
std::list<ServerIdType> GetServerInCluster() const;
std::list<ZoneIdType> GetZoneInCluster() const;
std::list<PoolIdType> GetPhysicalPoolInCluster() const;
std::list<PoolIdType> GetLogicalPoolInCluster() const;
std::list<ChunkServerIdType> GetChunkServerInServer(ServerIdType id) const;
std::list<ChunkServerIdType> GetChunkServerInZone(ZoneIdType id) const;
std::list<ChunkServerIdType> GetChunkServerInPhysicalPool(
PoolIdType id) const;
std::list<ServerIdType> GetServerInZone(ZoneIdType id) const;
std::list<ServerIdType> GetServerInPhysicalPool(PoolIdType id) const;
std::list<ZoneIdType> GetZoneInPhysicalPool(PoolIdType id) const;
std::list<PoolIdType> GetLogicalPoolInPhysicalPool(PoolIdType id) const;
std::list<ChunkServerIdType> GetChunkServerInLogicalPool(
PoolIdType id) const;
std::list<ServerIdType> GetServerInLogicalPool(PoolIdType id) const;
std::list<ZoneIdType> GetZoneInLogicalPool(PoolIdType id) const;
std::vector<CopySetIdType> GetCopySetsInLogicalPool(
PoolIdType logicalPoolId) const;
private:
std::unordered_map<PoolIdType, LogicalPool> logicalPoolMap_;
std::unordered_map<PoolIdType, PhysicalPool> physicalPoolMap_;
std::unordered_map<ZoneIdType, Zone> zoneMap_;
std::unordered_map<ServerIdType, Server> serverMap_;
std::unordered_map<ChunkServerIdType, ChunkServer> chunkServerMap_;
std::map<CopySetKey, CopySetInfo> copySetMap_;
std::shared_ptr<TopologyIdGenerator> idGenerator_;
std::shared_ptr<TopologyTokenGenerator> tokenGenerator_;
std::shared_ptr<TopologyStorage> storage_;
//以如下声明的顺序获取锁,防止死锁
mutable curve::common::mutex logicalPoolMutex_;
mutable curve::common::mutex physicalPoolMutex_;
mutable curve::common::mutex zoneMutex_;
mutable curve::common::mutex serverMutex_;
mutable curve::common::mutex chunkServerMutex_;
mutable curve::common::mutex copySetMutex_;
};
} // namespace topology
} // namespace mds
} // namespace curve
#endif // CURVE_SRC_MDS_TOPOLOGY_TOPOLOGY_H_
/*
* Project: curve
* Created Date: Fri Oct 12 2018
* Author: xuchaojie
* Copyright (c) 2018 netease
*/
#include "src/mds/topology/topology_admin.h"
#include <cstdlib>
#include <ctime>
#include <vector>
#include <list>
namespace curve {
namespace mds {
namespace topology {
bool TopologyAdminImpl::AllocateChunkRandomInSingleLogicalPool(
curve::mds::FileType fileType,
uint32_t chunkNumber,
std::vector<CopysetIdInfo> *infos) {
std::vector<PoolIdType> logicalPools;
LogicalPoolType poolType;
switch (fileType) {
case INODE_PAGEFILE: {
poolType = LogicalPoolType::PAGEFILE;
break;
}
case INODE_APPENDFILE: {
poolType = LogicalPoolType::APPENDFILE;
break;
}
case INODE_APPENDECFILE: {
poolType = LogicalPoolType::APPENDECFILE;
break;
}
default:
return false;
break;
}
auto logicalPoolFilter =
[poolType] (const LogicalPool &pool) {
return pool.GetLogicalPoolType() == poolType;
};
FilterLogicalPool(logicalPoolFilter, &logicalPools);
std::srand(std::time(nullptr));
int randomIndex = std::rand() % logicalPools.size();
PoolIdType logicalPoolChosenId = logicalPools[randomIndex];
std::vector<CopySetIdType> copySetIds =
topology_->GetCopySetsInLogicalPool(logicalPoolChosenId);
infos->clear();
for (uint32_t i = 0; i < chunkNumber; i++) {
int randomCopySetIndex = std::rand() % copySetIds.size();
CopysetIdInfo idInfo;
idInfo.logicalPoolId = logicalPoolChosenId;
idInfo.copySetId = copySetIds[randomCopySetIndex];
infos->push_back(idInfo);
}
}
void TopologyAdminImpl::FilterLogicalPool(LogicalPoolFilter filter,
std::vector<PoolIdType> *logicalPoolIdsOut) {
logicalPoolIdsOut->clear();
std::list<PoolIdType> logicalPoolList =
topology_->GetLogicalPoolInCluster();
for (PoolIdType id : logicalPoolList) {
LogicalPool pool;
if (topology_->GetLogicalPool(id, &pool)) {
if (filter(pool)) {
logicalPoolIdsOut->push_back(id);
}
}
}
}
} // namespace topology
} // namespace mds
} // namespace curve
......@@ -9,14 +9,17 @@
#define CURVE_SRC_MDS_TOPOLOGY_TOPOLOGY_ADMIN_H_
#include <vector>
#include <memory>
#include <functional>
#include "src/mds/topology/topology.h"
#include "proto/nameserver2.pb.h"
namespace curve {
namespace mds {
namespace topology {
#define PoolIdType uint16_t
#define CopySetIdType uint32_t
struct CopysetIdInfo {
PoolIdType logicalPoolId;
......@@ -28,13 +31,38 @@ public:
TopologyAdmin() {}
virtual ~TopologyAdmin() {}
virtual bool AllocateChunkRandomInSingleLogicalPool (
::curve::mds::FileType fileType,
::curve::mds::FileType fileType,
uint32_t chunkNumer,
std::vector<CopysetIdInfo> *infos) = 0;
};
class TopologyAdminImpl : public TopologyAdmin {
public:
TopologyAdminImpl(std::shared_ptr<Topology> topology)
: topology_(topology) {}
~TopologyAdminImpl() {}
bool AllocateChunkRandomInSingleLogicalPool(
curve::mds::FileType fileType,
uint32_t chunkNumber,
std::vector<CopysetIdInfo> *infos) override;
private:
using LogicalPoolFilter = std::function<bool (const LogicalPool &)>;
void FilterLogicalPool(LogicalPoolFilter filter,
std::vector<PoolIdType> *logicalPoolIdsOut);
private:
std::shared_ptr<Topology> topology_;
};
} // namespace topology
} // namespace mds
} // namespace curve
#endif // CURVE_SRC_MDS_TOPOLOGY_TOPOLOGY_ADMIN_H_
\ No newline at end of file
#endif // CURVE_SRC_MDS_TOPOLOGY_TOPOLOGY_ADMIN_H_
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment