Unverified Commit ca74cf99 authored by irwinsun's avatar irwinsun Committed by GitHub
Browse files

Merge pull request #5536 from sawyersong2/issue5525_github_branch

feat: 适配代码拉取调度优化 #5525
parents 9531bdf0 34f44f3d
Showing with 103 additions and 72 deletions
+103 -72
...@@ -34,6 +34,7 @@ import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig ...@@ -34,6 +34,7 @@ import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig
import com.tencent.devops.dispatch.docker.pojo.DockerIpInfoVO import com.tencent.devops.dispatch.docker.pojo.DockerIpInfoVO
import com.tencent.devops.dispatch.docker.pojo.DockerIpListPage import com.tencent.devops.dispatch.docker.pojo.DockerIpListPage
import com.tencent.devops.dispatch.docker.pojo.DockerIpUpdateVO import com.tencent.devops.dispatch.docker.pojo.DockerIpUpdateVO
import com.tencent.devops.dispatch.docker.pojo.HostDriftLoad
import io.swagger.annotations.Api import io.swagger.annotations.Api
import io.swagger.annotations.ApiOperation import io.swagger.annotations.ApiOperation
import io.swagger.annotations.ApiParam import io.swagger.annotations.ApiParam
...@@ -167,6 +168,6 @@ interface OPDispatchDockerResource { ...@@ -167,6 +168,6 @@ interface OPDispatchDockerResource {
@HeaderParam(AUTH_HEADER_DEVOPS_USER_ID) @HeaderParam(AUTH_HEADER_DEVOPS_USER_ID)
userId: String, userId: String,
@ApiParam("阈值", required = true) @ApiParam("阈值", required = true)
thresholdMap: Map<String, String> hostDriftLoad: HostDriftLoad
): Result<Boolean> ): Result<Boolean>
} }
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.tencent.devops.dispatch.docker.pojo
data class HostDriftLoad(
val memory: Int,
val cpu: Int,
val disk: Int,
val diskIo: Int,
val usedNum: Int
)
...@@ -177,7 +177,7 @@ class DockerHostClient @Autowired constructor( ...@@ -177,7 +177,7 @@ class DockerHostClient @Autowired constructor(
containerHashId = dispatchMessage.containerHashId, containerHashId = dispatchMessage.containerHashId,
customBuildEnv = dispatchMessage.customBuildEnv, customBuildEnv = dispatchMessage.customBuildEnv,
dockerResource = getDockerResource(dispatchType), dockerResource = getDockerResource(dispatchType),
qpcUniquePath = getQpcUniquePath(dispatchMessage.projectId) qpcUniquePath = getQpcUniquePath(dispatchMessage)
) )
pipelineDockerTaskSimpleDao.createOrUpdate( pipelineDockerTaskSimpleDao.createOrUpdate(
...@@ -500,7 +500,8 @@ class DockerHostClient @Autowired constructor( ...@@ -500,7 +500,8 @@ class DockerHostClient @Autowired constructor(
} }
} }
private fun getQpcUniquePath(projectId: String): String? { private fun getQpcUniquePath(dispatchMessage: DispatchMessage): String? {
val projectId = dispatchMessage.projectId
return if (projectId.startsWith("git_") && return if (projectId.startsWith("git_") &&
dockerHostQpcService.checkQpcWhitelist(projectId.removePrefix("git_")) dockerHostQpcService.checkQpcWhitelist(projectId.removePrefix("git_"))
) { ) {
......
...@@ -34,6 +34,7 @@ import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig ...@@ -34,6 +34,7 @@ import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig
import com.tencent.devops.dispatch.docker.pojo.DockerIpInfoVO import com.tencent.devops.dispatch.docker.pojo.DockerIpInfoVO
import com.tencent.devops.dispatch.docker.pojo.DockerIpListPage import com.tencent.devops.dispatch.docker.pojo.DockerIpListPage
import com.tencent.devops.dispatch.docker.pojo.DockerIpUpdateVO import com.tencent.devops.dispatch.docker.pojo.DockerIpUpdateVO
import com.tencent.devops.dispatch.docker.pojo.HostDriftLoad
import com.tencent.devops.dispatch.docker.service.DispatchDockerService import com.tencent.devops.dispatch.docker.service.DispatchDockerService
@RestResource @RestResource
...@@ -88,7 +89,7 @@ class OPDispatchDockerResourceImpl constructor( ...@@ -88,7 +89,7 @@ class OPDispatchDockerResourceImpl constructor(
return Result(dispatchDockerService.getDockerDriftThreshold(userId)) return Result(dispatchDockerService.getDockerDriftThreshold(userId))
} }
override fun updateDockerDriftThreshold(userId: String, thresholdMap: Map<String, String>): Result<Boolean> { override fun updateDockerDriftThreshold(userId: String, hostDriftLoad: HostDriftLoad): Result<Boolean> {
return Result(dispatchDockerService.updateDockerDriftThreshold(userId, thresholdMap)) return Result(dispatchDockerService.updateDockerDriftThreshold(userId, hostDriftLoad))
} }
} }
...@@ -115,7 +115,7 @@ class PipelineDockerHostDao { ...@@ -115,7 +115,7 @@ class PipelineDockerHostDao {
dslContext: DSLContext, dslContext: DSLContext,
projectId: String, projectId: String,
type: DockerHostType = DockerHostType.BUILD type: DockerHostType = DockerHostType.BUILD
): List<String> { ): Set<String> {
with(TDispatchPipelineDockerHost.T_DISPATCH_PIPELINE_DOCKER_HOST) { with(TDispatchPipelineDockerHost.T_DISPATCH_PIPELINE_DOCKER_HOST) {
val result = dslContext.select(HOST_IP).from(this) val result = dslContext.select(HOST_IP).from(this)
.where(PROJECT_CODE.eq(projectId)) .where(PROJECT_CODE.eq(projectId))
...@@ -123,9 +123,9 @@ class PipelineDockerHostDao { ...@@ -123,9 +123,9 @@ class PipelineDockerHostDao {
.fetchOne(HOST_IP) .fetchOne(HOST_IP)
return if (result != null && result.isNotEmpty()) { return if (result != null && result.isNotEmpty()) {
result.split(",") result.split(",").toSet()
} else { } else {
emptyList() emptySet()
} }
} }
} }
......
...@@ -114,8 +114,8 @@ class DockerVMListener @Autowired constructor( ...@@ -114,8 +114,8 @@ class DockerVMListener @Autowired constructor(
var poolNo = 0 var poolNo = 0
try { try {
// 先判断是否OP已配置专机,若配置了专机,看当前ip是否在专机列表中,若在 选择当前IP并检查负载,若不在从专机列表中选择一个容量最小的 // 先判断是否OP已配置专机,若配置了专机,看当前ip是否在专机列表中,若在 选择当前IP并检查负载,若不在从专机列表中选择一个容量最小的
val specialIpSet = pipelineDockerHostDao.getHostIps(dslContext, dispatchMessage.projectId).toSet() val specialIpSet = pipelineDockerHostDao.getHostIps(dslContext, dispatchMessage.projectId)
logger.info("${dispatchMessage.projectId}| specialIpSet: $specialIpSet") logger.info("${dispatchMessage.projectId}| specialIpSet: $specialIpSet -- ${specialIpSet.size}")
val taskHistory = pipelineDockerTaskSimpleDao.getByPipelineIdAndVMSeq( val taskHistory = pipelineDockerTaskSimpleDao.getByPipelineIdAndVMSeq(
dslContext = dslContext, dslContext = dslContext,
...@@ -138,31 +138,9 @@ class DockerVMListener @Autowired constructor( ...@@ -138,31 +138,9 @@ class DockerVMListener @Autowired constructor(
) )
} else { } else {
driftIpInfo = JsonUtil.toJson(dockerIpInfo.intoMap()) driftIpInfo = JsonUtil.toJson(dockerIpInfo.intoMap())
// 根据当前IP负载选择IP
dockerPair = if (specialIpSet.isNotEmpty() && specialIpSet.toString() != "[]") { val pair = dockerHostUtils.checkAndSetIP(dispatchMessage, specialIpSet, dockerIpInfo, poolNo)
// 该项目工程配置了专机 dockerPair = Pair(pair.first, pair.second)
if (specialIpSet.contains(taskHistory.dockerIp) && dockerIpInfo.enable) {
// 上一次构建IP在专机列表中,直接重用
Pair(taskHistory.dockerIp, dockerIpInfo.dockerHostPort)
} else {
// 不在专机列表中,重新依据专机列表去选择负载最小的
driftIpInfo = "专机漂移"
dockerHostUtils.getAvailableDockerIpWithSpecialIps(
dispatchMessage.projectId,
dispatchMessage.pipelineId,
dispatchMessage.vmSeqId,
specialIpSet
)
}
} else {
// 没有配置专机,根据当前IP负载选择IP
val triple = dockerHostUtils.checkAndSetIP(dispatchMessage, specialIpSet, dockerIpInfo, poolNo)
if (triple.third.isNotEmpty()) {
driftIpInfo = triple.third
}
Pair(triple.first, triple.second)
}
} }
} else { } else {
// 第一次构建,根据负载条件选择可用IP // 第一次构建,根据负载条件选择可用IP
......
...@@ -34,6 +34,7 @@ import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig ...@@ -34,6 +34,7 @@ import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig
import com.tencent.devops.dispatch.docker.pojo.DockerIpInfoVO import com.tencent.devops.dispatch.docker.pojo.DockerIpInfoVO
import com.tencent.devops.dispatch.docker.pojo.DockerIpListPage import com.tencent.devops.dispatch.docker.pojo.DockerIpListPage
import com.tencent.devops.dispatch.docker.pojo.DockerIpUpdateVO import com.tencent.devops.dispatch.docker.pojo.DockerIpUpdateVO
import com.tencent.devops.dispatch.docker.pojo.HostDriftLoad
import com.tencent.devops.dispatch.docker.pojo.enums.DockerHostClusterType import com.tencent.devops.dispatch.docker.pojo.enums.DockerHostClusterType
import com.tencent.devops.dispatch.docker.utils.CommonUtils import com.tencent.devops.dispatch.docker.utils.CommonUtils
import com.tencent.devops.dispatch.docker.utils.DockerHostUtils import com.tencent.devops.dispatch.docker.utils.DockerHostUtils
...@@ -213,14 +214,10 @@ class DispatchDockerService @Autowired constructor( ...@@ -213,14 +214,10 @@ class DispatchDockerService @Autowired constructor(
return mapOf("threshold" to dockerHostUtils.getDockerDriftThreshold().toString()) return mapOf("threshold" to dockerHostUtils.getDockerDriftThreshold().toString())
} }
fun updateDockerDriftThreshold(userId: String, thresholdMap: Map<String, String>): Boolean { fun updateDockerDriftThreshold(userId: String, hostDriftLoad: HostDriftLoad): Boolean {
logger.info("$userId updateDockerDriftThreshold $thresholdMap") logger.info("$userId updateDockerDriftThreshold $hostDriftLoad")
val threshold = (thresholdMap["threshold"] ?: error("Parameter threshold must in (0-100).")).toInt()
if (threshold < 0 || threshold > 100) {
throw IllegalArgumentException("Parameter threshold must in (0-100).")
}
dockerHostUtils.updateDockerDriftThreshold(threshold) dockerHostUtils.updateDockerDriftThreshold(hostDriftLoad)
return true return true
} }
} }
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
package com.tencent.devops.dispatch.docker.service package com.tencent.devops.dispatch.docker.service
import com.tencent.devops.common.dispatch.sdk.pojo.DispatchMessage
import com.tencent.devops.common.redis.RedisOperation import com.tencent.devops.common.redis.RedisOperation
import com.tencent.devops.dispatch.docker.common.Constants import com.tencent.devops.dispatch.docker.common.Constants
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
...@@ -67,6 +68,17 @@ class DockerHostQpcService @Autowired constructor( ...@@ -67,6 +68,17 @@ class DockerHostQpcService @Autowired constructor(
return redisOperation.isMember(Constants.QPC_WHITE_LIST_KEY_PREFIX, gitProjectId) return redisOperation.isMember(Constants.QPC_WHITE_LIST_KEY_PREFIX, gitProjectId)
} }
fun getQpcUniquePath(dispatchMessage: DispatchMessage): String? {
val projectId = dispatchMessage.projectId
return if (projectId.startsWith("git_") &&
checkQpcWhitelist(projectId.removePrefix("git_"))
) {
return projectId.removePrefix("git_")
} else {
null
}
}
companion object { companion object {
private val LOG = LoggerFactory.getLogger(DockerHostQpcService::class.java) private val LOG = LoggerFactory.getLogger(DockerHostQpcService::class.java)
} }
......
...@@ -43,13 +43,14 @@ import com.tencent.devops.dispatch.docker.dao.PipelineDockerTaskDriftDao ...@@ -43,13 +43,14 @@ import com.tencent.devops.dispatch.docker.dao.PipelineDockerTaskDriftDao
import com.tencent.devops.dispatch.docker.dao.PipelineDockerTaskSimpleDao import com.tencent.devops.dispatch.docker.dao.PipelineDockerTaskSimpleDao
import com.tencent.devops.dispatch.docker.exception.DockerServiceException import com.tencent.devops.dispatch.docker.exception.DockerServiceException
import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig import com.tencent.devops.dispatch.docker.pojo.DockerHostLoadConfig
import com.tencent.devops.dispatch.docker.pojo.HostDriftLoad
import com.tencent.devops.dispatch.docker.pojo.enums.DockerHostClusterType import com.tencent.devops.dispatch.docker.pojo.enums.DockerHostClusterType
import com.tencent.devops.dispatch.docker.service.DockerHostQpcService
import com.tencent.devops.dispatch.pojo.enums.PipelineTaskStatus import com.tencent.devops.dispatch.pojo.enums.PipelineTaskStatus
import com.tencent.devops.model.dispatch.tables.records.TDispatchPipelineDockerIpInfoRecord import com.tencent.devops.model.dispatch.tables.records.TDispatchPipelineDockerIpInfoRecord
import org.jooq.DSLContext import org.jooq.DSLContext
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
import java.util.Random import java.util.Random
...@@ -64,19 +65,17 @@ class DockerHostUtils @Autowired constructor( ...@@ -64,19 +65,17 @@ class DockerHostUtils @Autowired constructor(
private val pipelineDockerTaskDriftDao: PipelineDockerTaskDriftDao, private val pipelineDockerTaskDriftDao: PipelineDockerTaskDriftDao,
private val pipelineDockerTaskSimpleDao: PipelineDockerTaskSimpleDao, private val pipelineDockerTaskSimpleDao: PipelineDockerTaskSimpleDao,
private val pipelineDockerBuildDao: PipelineDockerBuildDao, private val pipelineDockerBuildDao: PipelineDockerBuildDao,
private val dockerHostQpcService: DockerHostQpcService,
private val dslContext: DSLContext private val dslContext: DSLContext
) { ) {
companion object { companion object {
private const val LOAD_CONFIG_KEY = "dockerhost-load-config" private const val LOAD_CONFIG_KEY = "dockerhost-load-config"
private const val DOCKER_DRIFT_THRESHOLD_KEY = "docker-drift-threshold-spKyQ86qdYhAkDDR" private const val DOCKER_DRIFT_THRESHOLD_KEY = "dispatchdocker:drift-threshold-spKyQ86qdYhAkDDR"
private const val BUILD_POOL_SIZE = 100 // 单个流水线可同时执行的任务数量 private const val BUILD_POOL_SIZE = 100 // 单个流水线可同时执行的任务数量
private val logger = LoggerFactory.getLogger(DockerHostUtils::class.java) private val logger = LoggerFactory.getLogger(DockerHostUtils::class.java)
} }
@Value("\${dispatch.defaultAgentLessIp:127.0.0.1}")
val defaultAgentLessIp: String = ""
fun getAvailableDockerIpWithSpecialIps( fun getAvailableDockerIpWithSpecialIps(
projectId: String, projectId: String,
pipelineId: String, pipelineId: String,
...@@ -124,13 +123,6 @@ class DockerHostUtils @Autowired constructor( ...@@ -124,13 +123,6 @@ class DockerHostUtils @Autowired constructor(
} }
if (dockerPair.first.isEmpty()) { if (dockerPair.first.isEmpty()) {
// agentless方案升级兼容
logger.info("defaultAgentLessIp: $defaultAgentLessIp")
if (clusterName == DockerHostClusterType.AGENT_LESS && defaultAgentLessIp.isNotEmpty()) {
val defaultAgentLessIpList = defaultAgentLessIp.split(",")
return Pair(defaultAgentLessIpList[Random().nextInt(defaultAgentLessIpList.size)], 80)
}
if (specialIpSet.isNotEmpty()) { if (specialIpSet.isNotEmpty()) {
throw DockerServiceException(errorType = ErrorCodeEnum.NO_SPECIAL_VM_ERROR.errorType, throw DockerServiceException(errorType = ErrorCodeEnum.NO_SPECIAL_VM_ERROR.errorType,
errorCode = ErrorCodeEnum.NO_SPECIAL_VM_ERROR.errorCode, errorCode = ErrorCodeEnum.NO_SPECIAL_VM_ERROR.errorCode,
...@@ -248,45 +240,58 @@ class DockerHostUtils @Autowired constructor( ...@@ -248,45 +240,58 @@ class DockerHostUtils @Autowired constructor(
specialIpSet: Set<String>, specialIpSet: Set<String>,
dockerIpInfo: TDispatchPipelineDockerIpInfoRecord, dockerIpInfo: TDispatchPipelineDockerIpInfoRecord,
poolNo: Int poolNo: Int
): Triple<String, Int, String> { ): Pair<String, Int> {
val dockerIp = dockerIpInfo.dockerIp
// 查看当前IP负载情况,当前IP不可用或者负载超额或者设置为专机独享或者是否灰度已被切换,重新选择构建机 // 查看当前IP负载情况,当前IP不可用或者负载超额或者设置为专机独享或者是否灰度已被切换,重新选择构建机
val threshold = getDockerDriftThreshold() val hostDriftLoad = getDockerDriftThreshold()
if (!dockerIpInfo.enable || if (!dockerIpInfo.enable ||
dockerIpInfo.diskLoad > 90 || dockerIpInfo.diskLoad > hostDriftLoad.disk ||
dockerIpInfo.diskIoLoad > 85 || dockerIpInfo.diskIoLoad > hostDriftLoad.diskIo ||
dockerIpInfo.memLoad > threshold || dockerIpInfo.memLoad > hostDriftLoad.memory ||
dockerIpInfo.specialOn || dockerIpInfo.cpuLoad > hostDriftLoad.cpu ||
(dockerIpInfo.specialOn && !specialIpSet.contains(dockerIpInfo.dockerIp)) ||
(dockerIpInfo.grayEnv != gray.isGray()) || (dockerIpInfo.grayEnv != gray.isGray()) ||
(dockerIpInfo.usedNum > 40 && dockerIpInfo.memLoad > 60)) { (dockerIpInfo.usedNum > hostDriftLoad.usedNum) ||
val pair = getAvailableDockerIpWithSpecialIps( dockerHostQpcService.getQpcUniquePath(dispatchMessage) != null) {
return getAvailableDockerIpWithSpecialIps(
dispatchMessage.projectId, dispatchMessage.projectId,
dispatchMessage.pipelineId, dispatchMessage.pipelineId,
dispatchMessage.vmSeqId, dispatchMessage.vmSeqId,
specialIpSet specialIpSet
) )
return Triple(pair.first, pair.second, "")
} }
return Triple(dockerIp, dockerIpInfo.dockerHostPort, "") return Pair(dockerIpInfo.dockerIp, dockerIpInfo.dockerHostPort)
} }
fun updateDockerDriftThreshold(threshold: Int) { fun updateDockerDriftThreshold(hostDriftLoad: HostDriftLoad) {
redisOperation.set(DOCKER_DRIFT_THRESHOLD_KEY, threshold.toString()) redisOperation.set(
key = DOCKER_DRIFT_THRESHOLD_KEY,
value = JsonUtil.toJson(hostDriftLoad),
expired = false
)
} }
fun getDockerDriftThreshold(): Int { fun getDockerDriftThreshold(): HostDriftLoad {
val thresholdStr = redisOperation.get(DOCKER_DRIFT_THRESHOLD_KEY) val thresholdStr = redisOperation.get(DOCKER_DRIFT_THRESHOLD_KEY)
return if (thresholdStr != null && thresholdStr.isNotEmpty()) { return if (thresholdStr != null && thresholdStr.isNotEmpty()) {
thresholdStr.toInt() JsonUtil.to(thresholdStr, HostDriftLoad::class.java)
} else { } else {
90 HostDriftLoad(
cpu = 80,
memory = 70,
disk = 80,
diskIo = 80,
usedNum = 40
)
} }
} }
fun createLoadConfig(loadConfigMap: Map<String, DockerHostLoadConfig>) { fun createLoadConfig(loadConfigMap: Map<String, DockerHostLoadConfig>) {
redisOperation.set(LOAD_CONFIG_KEY, JsonUtil.toJson(loadConfigMap)) redisOperation.set(
key = LOAD_CONFIG_KEY,
value = JsonUtil.toJson(loadConfigMap),
expired = false
)
} }
fun getLoadConfig(): Triple<DockerHostLoadConfig, DockerHostLoadConfig, DockerHostLoadConfig> { fun getLoadConfig(): Triple<DockerHostLoadConfig, DockerHostLoadConfig, DockerHostLoadConfig> {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment