From f528a82833700bdd56cbad966ae68b77a0fe3f13 Mon Sep 17 00:00:00 2001 From: waylon <1158341873@qq.com> Date: Tue, 16 Apr 2024 15:54:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20celery=204=20&=205=20=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E7=94=A8=E6=B3=95=E5=85=BC=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py | 6 +++--- .../pipeline/contrib/node_timer_event/tasks.py | 6 +++--- .../pipeline/contrib/plugin_execute/tasks.py | 6 +++--- runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py index 7e63f6ab..b6d297db 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py @@ -13,7 +13,7 @@ import json import logging -from celery import task +from celery import current_app from pipeline.contrib.node_timeout.settings import node_timeout_settings from pipeline.eri.models import State, Process @@ -24,7 +24,7 @@ logger = logging.getLogger("celery") -@task(acks_late=True) +@current_app.task(acks_late=True) def dispatch_timeout_nodes(record_id: int): record = TimeoutNodesRecord.objects.get(id=record_id) nodes = json.loads(record.timeout_nodes) @@ -40,7 +40,7 @@ def dispatch_timeout_nodes(record_id: int): ) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def execute_node_timeout_strategy(node_id, version): timeout_config = TimeoutNodeConfig.objects.filter(node_id=node_id).only("root_pipeline_id", "action").first() if timeout_config is None: diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py index 44970a5a..fd904c55 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py @@ -14,7 +14,7 @@ import logging from typing import Any, Dict, List, Type, Union -from celery import task +from celery import current_app from pipeline.contrib.node_timer_event.adapter import NodeTimerEventBaseAdapter from pipeline.contrib.node_timer_event.handlers import ActionManager from pipeline.contrib.node_timer_event.models import ExpiredNodesRecord @@ -24,7 +24,7 @@ logger = logging.getLogger("celery") -@task(acks_late=True) +@current_app.task(acks_late=True) def dispatch_expired_nodes(record_id: int): record: ExpiredNodesRecord = ExpiredNodesRecord.objects.get(id=record_id) node_keys: List[str] = json.loads(record.nodes) @@ -62,7 +62,7 @@ def dispatch_expired_nodes(record_id: int): logger.info("[dispatch_expired_nodes] record deleted: record -> %s", record_id) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def execute_node_timer_event_action(node_id: str, version: str, index: int): adapter_class: Type[NodeTimerEventBaseAdapter] = node_timer_event_settings.adapter_class diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py index b1898431..eccf1908 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py @@ -14,7 +14,7 @@ import logging import traceback -from celery import task +from celery import current_app from django.utils import timezone from pipeline.component_framework.library import ComponentLibrary from pipeline.conf.default_settings import PLUGIN_EXECUTE_QUEUE @@ -25,7 +25,7 @@ logger = logging.getLogger("celery") -@task +@current_app.task def execute(task_id): try: plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) @@ -89,7 +89,7 @@ def execute(task_id): ) -@task +@current_app.task def schedule(task_id): try: plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py index 259df0cb..fde31d89 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py @@ -2,7 +2,7 @@ import json import logging -from celery import task +from celery import current_app from django.conf import settings from django.db import transaction from pipeline.conf.default_settings import ROLLBACK_QUEUE @@ -287,7 +287,7 @@ def rollback(self): raise e -@task +@current_app.task def token_rollback(snapshot_id, node_id, retry=False, retry_data=None): """ snapshot_id 本次回滚的快照id @@ -296,6 +296,6 @@ def token_rollback(snapshot_id, node_id, retry=False, retry_data=None): TokenRollbackTaskHandler(snapshot_id=snapshot_id, node_id=node_id, retry=retry, retry_data=retry_data).rollback() -@task +@current_app.task def any_rollback(snapshot_id): AnyRollbackHandler(snapshot_id=snapshot_id).rollback()