Skip to content

Commit

Permalink
feat: celery 4 & 5 异步任务用法兼容
Browse files Browse the repository at this point in the history
  • Loading branch information
normal-wls committed Apr 16, 2024
1 parent 3c48fcc commit f528a82
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import json
import logging

from celery import task
from celery import current_app

from pipeline.contrib.node_timeout.settings import node_timeout_settings
from pipeline.eri.models import State, Process
Expand All @@ -24,7 +24,7 @@
logger = logging.getLogger("celery")


@task(acks_late=True)
@current_app.task(acks_late=True)
def dispatch_timeout_nodes(record_id: int):
record = TimeoutNodesRecord.objects.get(id=record_id)
nodes = json.loads(record.timeout_nodes)
Expand All @@ -40,7 +40,7 @@ def dispatch_timeout_nodes(record_id: int):
)


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def execute_node_timeout_strategy(node_id, version):
timeout_config = TimeoutNodeConfig.objects.filter(node_id=node_id).only("root_pipeline_id", "action").first()
if timeout_config is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
from typing import Any, Dict, List, Type, Union

from celery import task
from celery import current_app
from pipeline.contrib.node_timer_event.adapter import NodeTimerEventBaseAdapter
from pipeline.contrib.node_timer_event.handlers import ActionManager
from pipeline.contrib.node_timer_event.models import ExpiredNodesRecord
Expand All @@ -24,7 +24,7 @@
logger = logging.getLogger("celery")


@task(acks_late=True)
@current_app.task(acks_late=True)
def dispatch_expired_nodes(record_id: int):
record: ExpiredNodesRecord = ExpiredNodesRecord.objects.get(id=record_id)
node_keys: List[str] = json.loads(record.nodes)
Expand Down Expand Up @@ -62,7 +62,7 @@ def dispatch_expired_nodes(record_id: int):
logger.info("[dispatch_expired_nodes] record deleted: record -> %s", record_id)


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def execute_node_timer_event_action(node_id: str, version: str, index: int):

adapter_class: Type[NodeTimerEventBaseAdapter] = node_timer_event_settings.adapter_class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
import traceback

from celery import task
from celery import current_app
from django.utils import timezone
from pipeline.component_framework.library import ComponentLibrary
from pipeline.conf.default_settings import PLUGIN_EXECUTE_QUEUE
Expand All @@ -25,7 +25,7 @@
logger = logging.getLogger("celery")


@task
@current_app.task
def execute(task_id):
try:
plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
Expand Down Expand Up @@ -89,7 +89,7 @@ def execute(task_id):
)


@task
@current_app.task
def schedule(task_id):
try:
plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
Expand Down
6 changes: 3 additions & 3 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging

from celery import task
from celery import current_app
from django.conf import settings
from django.db import transaction
from pipeline.conf.default_settings import ROLLBACK_QUEUE
Expand Down Expand Up @@ -287,7 +287,7 @@ def rollback(self):
raise e


@task
@current_app.task
def token_rollback(snapshot_id, node_id, retry=False, retry_data=None):
"""
snapshot_id 本次回滚的快照id
Expand All @@ -296,6 +296,6 @@ def token_rollback(snapshot_id, node_id, retry=False, retry_data=None):
TokenRollbackTaskHandler(snapshot_id=snapshot_id, node_id=node_id, retry=retry, retry_data=retry_data).rollback()


@task
@current_app.task
def any_rollback(snapshot_id):
AnyRollbackHandler(snapshot_id=snapshot_id).rollback()

0 comments on commit f528a82

Please sign in to comment.