Skip to content

Commit

Permalink
fix: 优化同步进程状态,增加新业务自动中入2.0,适配828错误 (closed TencentBlueKing#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Apr 10, 2024
1 parent ed592b9 commit 222a56f
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 8 deletions.
16 changes: 16 additions & 0 deletions apps/gsekit/cmdb/handlers/cmdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class CMDBHandler(object):
CACHE_GLOBAL_VAR_TEMPLATE = "gsekit:cmdb:biz:{bk_biz_id}:global_variables"
CACHE_TOPO_ATTR_TEMPLATE = "gsekit:cmdb:biz:{bk_biz_id}:topo_tree_attributes"
CACHE_CLOUD_TEMPLATE = "gsekit:cmdb:biz:{bk_biz_id}:clouds"
CACHE_BIZ_ID_NAME = "gsekit:biz_id:name"
BK_BIZ_OBJ_ID = "biz"
BK_SET_OBJ_ID = "set"
BK_MODULE_OBJ_ID = "module"
Expand Down Expand Up @@ -107,6 +108,21 @@ def biz_list(user: User) -> List:

return all_biz_list

@classmethod
def biz_id_name_without_permission(cls) -> Dict[int, str]:
cache_biz_id_name: Dict[int, str] = cache.get(cls.CACHE_BIZ_ID_NAME)
if cache_biz_id_name:
return cache_biz_id_name

all_biz_list = CCApi.search_business({"fields": ["bk_biz_id", "bk_biz_name"]}, use_admin=True).get("info") or []
result = {biz["bk_biz_id"]: biz["bk_biz_name"] for biz in all_biz_list}
cache.set(
cls.CACHE_BIZ_ID_NAME,
result,
gsekit_const.CacheExpire.FREQUENT_UPDATE,
)
return result

@staticmethod
def map_cc3_field_to_cc1(new_field_name: str) -> str:
"""
Expand Down
3 changes: 3 additions & 0 deletions apps/gsekit/meta/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class KEYS:
PROCESS_TASK_GRANULARITY = "PROCESS_TASK_GRANULARITY" # 进程任务聚合粒度
# GSE 2.0 灰度列表
GSE2_GRAY_SCOPE_LIST = "GSE2_GRAY_SCOPE_LIST"
SYNC_BIZ_PROCESS_STATUS_TIMEOUT = "SYNC_BIZ_PROCESS_STATUS_TIMEOUT"
# 记录所有业务ID,用于同步新业务到灰度列表对比使用
ALL_BIZ_IDS = "ALL_BIZ_IDS"

@classmethod
def process_task_aggregate_info(cls, bk_biz_id: int) -> typing.Dict[str, str]:
Expand Down
34 changes: 34 additions & 0 deletions apps/gsekit/periodic_tasks/sync_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
from typing import List
from django.db import transaction
import django_celery_beat
from celery.task import periodic_task, task
from django.utils import timezone
from apps.core.gray.handlers import GrayHandler
from apps.gsekit.cmdb.handlers.cmdb import CMDBHandler

from apps.gsekit.job.models import Job
from apps.gsekit.meta.models import GlobalSettings
from apps.gsekit.periodic_tasks.utils import calculate_countdown
from apps.gsekit.process.handlers.process import ProcessHandler
from common.log import logger
Expand All @@ -38,3 +43,32 @@ def sync_process(bk_biz_id=None):
# TODO 由于GSE接口存在延迟,此处暂停同步状态的周期任务,待GSE优化后再开启
# ProcessHandler(bk_biz_id=biz_id).sync_proc_status_to_db()
logger.info(f"[sync_process] bk_biz_id={biz_id} will be run after {countdown} seconds.")


@periodic_task(run_every=django_celery_beat.tzcrontab.TzAwareCrontab(minute="*/30", tz=timezone.get_current_timezone()))
def sync_new_biz_to_gray_scope_list():
"""
添加新增业务到灰度列表
"""
task_id = sync_new_biz_to_gray_scope_list.request.id
logger.info(f"sync_new_biz_to_gray_scope_list: {task_id} Start adding new biz to GSE2_GRAY_SCOPE_LIST.")

all_biz_ids = GlobalSettings.get_config(key=GlobalSettings.KEYS.ALL_BIZ_IDS, default=[])
if not all_biz_ids:
logger.info(f"sync_new_biz_to_gray_scope_list: {task_id} No need to add new biz to GSE2_GRAY_SCOPE_LIST.")
return None

cc_all_biz_ids: List[int] = list(CMDBHandler.biz_id_name_without_permission().keys())
new_biz_ids: List[int] = list(set(cc_all_biz_ids) - set(all_biz_ids))

logger.info(f"sync_new_biz_to_gray_scope_list: {task_id} new biz ids: {new_biz_ids}.")
if new_biz_ids:
with transaction.atomic():
# 更新全部业务列表
GlobalSettings.update_config(key=GlobalSettings.KEYS.ALL_BIZ_IDS, value=cc_all_biz_ids)

# 对新业务执行灰度操作
result = GrayHandler.build({"bk_biz_ids": new_biz_ids})
logger.info(f"sync_new_biz_to_gray_scope_list: {task_id} New biz: {new_biz_ids} Build result: {result}")

logger.info(f"sync_new_biz_to_gray_scope_list: {task_id} Add new biz to GSE2_GRAY_SCOPE_LIST completed.")
9 changes: 5 additions & 4 deletions apps/gsekit/pipeline_plugins/components/collections/gse.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class GseDataErrorCode(object):
def need_ignore_err_code(cls, op_type: int, error_code: int) -> bool:
# 对运行中的进程执行【启动】命令,结果是执行失败(不能忽略),业务需要全stop后再重新start
# (主要考量的点在于进程、配置会有更新遗漏导致最终没有生效的风险)
# if op_type == GseOpType.START and error_code == cls.PROC_RUNNING:
# # 启动进程,但进程本身已运行中
# return True
if op_type == GseOpType.START and error_code == cls.PROC_RUNNING:
# 启动进程,但进程本身已运行中
return True

# 对于已停止的进程执行【停止】命令,结果是执行成功,已停止的进程实例可以标记成忽略
if op_type == GseOpType.STOP and error_code == cls.PROC_NO_RUNNING:
Expand Down Expand Up @@ -452,7 +452,8 @@ def _schedule(self, data, parent_data, common_data, callback_data=None):
task_result = self.get_job_task_gse_result(gse_api_result, job_task, common_data)
error_code = task_result.get("error_code")

if error_code == GseDataErrorCode.SUCCESS:
# TODO GSE当前版本并未区分828的单独处理逻辑,所以如果遇到828错误认为成功
if error_code in [GseDataErrorCode.SUCCESS]:
gse_ip_proc_info = json.loads(task_result["content"])
try:
pid = gse_ip_proc_info["process"][0]["instance"][0]["pid"]
Expand Down
25 changes: 21 additions & 4 deletions apps/gsekit/process/handlers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
from apps.gsekit import constants
from apps.gsekit.cmdb.handlers.cmdb import CMDBHandler
from apps.gsekit.configfile.models import ConfigTemplateBindingRelationship, ConfigTemplate
from apps.gsekit.meta.models import GlobalSettings
from apps.gsekit.pipeline_plugins.components.collections.gse import NAMESPACE, GseAutoType, GseDataErrorCode, GseOpType
from apps.gsekit.process import exceptions
from apps.gsekit.process.exceptions import (
ProcessDoseNotExistException,
DuplicateProcessInstException,
# DuplicateProcessInstException,
ProcessNotMatchException,
)
from apps.gsekit.process.models import Process, ProcessInst
Expand Down Expand Up @@ -768,8 +769,17 @@ def create_process_inst(self, process_list: List):
uniq_key_set.add(inst.local_inst_id_uniq_key)

# 存在重复进程实例
# 排除重复进程实例不入库,记录日志
if duplicate_proc_instances:
raise DuplicateProcessInstException(uniq_key=duplicate_proc_instances)
to_be_created_inst = [
inst for inst in to_be_created_inst if inst.local_inst_id_uniq_key not in duplicate_proc_instances
]
# 记录日志便于回溯问题
logger.error(
f"sync_biz_process: {self.bk_biz_id} "
f"create_process_inst: duplicate_proc_instances uniq_key->{duplicate_proc_instances}"
)
# raise DuplicateProcessInstException(uniq_key=duplicate_proc_instances)

with transaction.atomic():
if to_be_deleted_inst_condition:
Expand Down Expand Up @@ -1022,7 +1032,13 @@ def get_proc_inst_status_infos(

proc_inst_status_infos = []
uniq_keys_recorded = set()
poll_time, interval, timeout = 0, 1.5, 60
# 全局配置超时时间
timeout: int = GlobalSettings.get_config(GlobalSettings.KEYS.SYNC_BIZ_PROCESS_STATUS_TIMEOUT, 30)
poll_time, interval, = (
0,
1.5,
)
start_time: int = int(time.time())
while True:
try:
gse_api_result = gse_api_helper.get_proc_operate_result(gse_task_id)["data"]
Expand Down Expand Up @@ -1064,7 +1080,8 @@ def get_proc_inst_status_infos(
elif task_result.get("error_code") != GseDataErrorCode.RUNNING:
uniq_keys_recorded.add(meta_key_uniq_key_map[meta_key])

if any([len(uniq_keys_recorded) == len(gse_api_result.keys()), poll_time > timeout]):
end_time: int = int(time.time())
if any([len(uniq_keys_recorded) == len(gse_api_result.keys()), end_time - start_time > timeout]):
break
time.sleep(interval)
poll_time += interval
Expand Down

0 comments on commit 222a56f

Please sign in to comment.