celery筆記一之celery介紹、啟動和運行結果跟蹤

来源:https://www.cnblogs.com/hunterxiong/archive/2023/06/01/17450464.html
-Advertisement-
Play Games

> 本文首發於公眾號:Hunter後端 > 原文鏈接:[celery筆記一之celery介紹、啟動和運行結果跟蹤](https://mp.weixin.qq.com/s/o6enPH4f1qo8WXrl9vO-1w) 本篇筆記內容如下: 1. celery 介紹 2. celery 準備 3. ce ...


本文首發於公眾號:Hunter後端
原文鏈接:celery筆記一之celery介紹、啟動和運行結果跟蹤

本篇筆記內容如下:

  1. celery 介紹
  2. celery 準備
  3. celery 啟動和非同步任務的運行
  4. 運行結果跟蹤

1、celery 介紹

celery 大致有兩種應用場景,一種是非同步任務,一種是定時任務。

比如說在一個介面請求中,某個函數執行所需的時間過長,而前端頁面並不是立刻需要在介面中獲取處理結果,可以將這個函數作為非同步任務,先返回給前端處理中的信息,在後臺單獨運行這個函數,這就是非同步任務。

另一個比如說某個函數需要每天晚上運行一遍,不可能人天天守著後臺手動執行一遍這個函數,那麼就可以用 celery 來實現這個定時的周期任務。

接下來介紹一下 celery 的組成:

task

這個任務就是我們前面舉的例子的非同步任務或者是定時任務,即為 task,我們可以定義這些任務,然後發送到 broker

broker

broker 可以理解成消息中間件,用於獲取非同步或者定時任務,形成一個或多個消息隊列,然後發送給 worker 處理這些消息

broker 的形式可以是 Redis,RabbitMQ 或者其他,這裡我們使用 Redis 作為消息中間件

worker

worker 是處理消息的程式,獲取 broker 中的消息,然後在 worker 中執行,然後根據配置決定將處理結果發送到 backend

result_backend

在 worker 處理完消息之後會有 return 或者沒有返回結果,都會根據配置將結果發送出來,可以配置成發送到 redis 中,也可以將之存儲到 database 中

beat

主要用於調用定時任務,根據設定好的定時任務,比如每天晚上十點執行某個函數,beat 則會在相應的時間將這個 task 發送給 broker,然後 worker 獲取任務進行處理

定時任務除了說的每天晚上十點這種周期任務,也可以是間隔任務,比如說每隔多少秒,多少分鐘執行一次

註意:非同步任務的發送是不經過 beat 處理,直接發送給 broker 的

在上面的結構中,broker 需要將相應的服務比如 redis 運行起來,而 worker 和 beat 需要在手動用程式運行,而且每次更改了定時策略之後需要重新啟動 beat 和 worker 才能生效。

2、celery 準備

接下來我們實現一個最簡單的非同步任務,在執行非同步任務前,我們做如下的準備工作

1.安裝依賴

我們需要安裝一下 celery 和 redis 的依賴:

pip3 install celery==5.1.2 -i https://mirrors.aliyun.com/pypi/simple/
pip3 install redis==3.5.3 -i https://mirrors.aliyun.com/pypi/simple/

2.消息中間件

這裡我們用到的消息中間件是 redis,可以去官網下載一個 redis 啟動,也可以使用 docker 來執行安裝。

我在之前的 docker 系列筆記中有介紹過如何拉取鏡像和運行容器,我們這裡直接使用 docker 來運行:

docker run -itd -p 6379:6379 redis:latest

3.非同步任務準備

我們準備一個最簡單的 add() 函數,放在 tasks.py 文件中:

# tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/1')

@app.task
def add(x, y): 
    return x + y

在這段代碼里,我們引入 Celery 模塊,並將其實例化為 app,且配置了 broker 參數,表示消息隊列都會被放在 redis 的第一個資料庫下

指定的 backend 參數則表示函數運行的結果被放在 redis 的第二個資料庫下

然後用 @app.task 修飾 add 函數,表示它是 app 下的 task 任務

以上,我們的準備工作就完成了,接下來嘗試運行這個非同步任務

3、celery 啟動和非同步任務的運行

說是 celery 的啟動,其實是 worker 的啟動,中間件是 redis,已經在前面的步驟中啟動了。

我們在 tasks.py 所在的文件夾下執行下麵的命令:

celery -A tasks worker -l INFO

在這裡,tasks 是我們任務所在的文件名,worker 表示啟動的是 worker 程式

-l INFO 則會在控制台列印出 worker 接收到的消息詳情,如果不執行,則信息流不會被列印出來

執行了上面的程式後,可以看到控制台會輸出下麵這種信息:


 -------------- celery@localhost v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- Darwin-21.4.0-x86_64-i386-64bit 2022-07-17 23:56:09
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fc8ddf3df98
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . tasks.add

[2022-07-17 23:56:09,685: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-07-17 23:56:09,699: INFO/MainProcess] mingle: searching for neighbors
[2022-07-17 23:56:10,737: INFO/MainProcess] mingle: all alone
[2022-07-17 23:56:10,780: INFO/MainProcess] celery@localhost ready.

則表示 worker 啟動成功

執行非同步任務

在另一個 shell 視窗,進入 python 的交互界面,輸入以下命令:

from tasks import add
res = add.delay(1,2)

add 是我們需要執行的非同步任務的函數名

delay 是非同步任務執行的特有方法,這個其實是 apply_async() 函數的簡便寫法,不帶任何參數,apply_async() 除了可以實現非同步任務的功能,還可以指定多少時間後執行,比如說二十秒後執行,這個在後面的筆記中我們再介紹。

而非同步任務的返回我們這裡用 res 來定義,它是一個包含了這個任務所有執行信息對象,有任務狀態(是否執行成功),有返回結果(add() 函數的return),還有這個 task 特有的標識 id等信息

至此,我們的一個非同步任務的執行就完成了,我們可以在下一步查看它的運行結果等信息。

4、運行結果跟蹤

接下來,我們在 tasks.py 中建立下麵幾個函數,來測試我們對結果的跟蹤:

# tasks.py

import time
from celery import Celery

app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/1')


@app.task
def add(x, y):
    return x + y


@app.task
def div(x, y):
    return x / y


@app.task
def test_not_finished():
    time.sleep(30)
    return True

然後重新運行 worker:

celery -A tasks worker -l INFO

然後引入和執行函數:

from tasks import add, div, test_not_finished

獲取延時任務的結果

res = add.delay(1, 2)
print(res.result)

# 也可以使用 get() 
print(res.get())

get() 函數也可以加個超時的設置:

res.get(timeout=2)

但是這樣需要註意,因為如果超時了還未獲取到結果,程式就會報錯

判斷函數運行是否完成

print(res.ready())

列印出的結果為 True 則表示函數運行完成

我們可以測試函數為完成的狀態:

res2 = test_not_finished.delay()

在這個函數里,我們設置了 30s 的休眠,所以在 30s 內我們列印結果可以看到 res2.ready() 是為 False 的:

print(res2.ready())

獲取task id

每個被執行的 task 都有各自對應的 id 作為它們的唯一鍵:

print(res.id)

查看任務執行的狀態

# 任務執行是否失敗,返回 布爾型數據
is_failed = res.failed()

# 任務執行是否成功,返回布爾型數據
is_successful = res.successful()

# 執行的任務所處的狀態
state = res.state
# state 的值會在 PENDING,STARTED,SUCCESS,RETRY,FAILURE 這幾種狀態中,分別是 待處理中,任務已經開始,成功,重試中,失敗

報錯處理

如果執行的延時任務在程式中報錯,比如我們定義的 div() 函數,我們傳入的除數為 0 的話,在程式中是會報錯的,我們使用 get() 來獲取結果的話程式是會報錯的:

res3 = div.delay(3, 0)
res3.get()

# 返回會報錯

但是我們可以使用 propagate=False 參數來忽略程式的報錯:

res3.get(propagate=False)

這樣我們獲取的就不是程式報錯,而是程式報錯的信息作為結果返回

使用 res3.state 發現返回的結果是 FAILURE

當延時任務在程式中報錯,它的返回值就不會是正確的,我們可以通過 res3.traceback 是否有值來判斷函數運行過程中是有報錯:

if res3.traceback:
    print("延時任務報錯")
else:
    print("程式正常運行,可以獲取返回值")

result資源釋放

因為 backend 會使用資源來保存和傳輸結果,為了確保資源被釋放,所以在執行完非同步任務後,你必須對每一個結果調用 get() 或者 forget() 函數

result.get() 函數獲取結果

result.forget() 在 backend 刪掉該數據

在官方文檔上,意思是 get() 和 forget() 方法都可以釋放資源,但是經過我測試,貌似只有 forget() 函數會釋放資源

查看是否資源被釋放也很簡單,登錄到對應的 backend,我這裡是 redis,使用 redis-cli 或者通過 docker 進入 redis:

select 1

keys*

查看相應的 task id 是否還在列表就可以知道該資源是否被釋放

如果不想手動釋放資源,可以在配置里設置一個過期時間,那麼結果就會在指定時間段後被釋放:

app.conf.update(result_expires=60)

這個我們可以在後面的配置里再詳細介紹。

以上就是本篇筆記全部內容,下一篇筆記我們將介紹如何建立一個 celery 項目、配置的幾種方法及一些基本的配置。

如果想獲取更多後端相關文章,可掃碼關註閱讀:
image


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 面試官:“HTTPS的加密過程你知道麽?” 我:“那肯定知道啊。” 面試官:“那你知道什麼情況下 HTTPS 不安全麽” 我:“這....” 越面覺得自己越菜,繼續努力學習!!! 什麼是中間人攻擊? 中間人攻擊(MITM)在密碼學和電腦 ...
  • # VuePress2.0構建項目文檔系統 參考TerraMours 官網。[https://terramours.site/](https://terramours.site/) 文件結構參考: ![image-20230530170541496](https://www.raokun.top/u ...
  • 當我們在電腦中使用浮點數進行計算時,特別是在使用二進位表示浮點數時,可能會出現舍入誤差。這是由於電腦使用有限的位數來表示浮點數,而某些十進位數無法精確地表示為有限的二進位數。 0.1 和 0.2 都是無限迴圈的二進位數,在轉換為浮點數時並不能完全準確地表示。將它們相加時,可能會出現舍入誤差。因此 ...
  • ## 1. 背景 - 業務背景:CRM系統隨著各業務條線對線索精細化分配的訴求逐漸增加,各個條線的流向規則會越來越複雜,各個條線甚至整個CRM的線索流轉規則急需一種樹形的可視化的圖來表達。 - 技術背景:在開發之前考慮了三種方案,原生canvas、fabric以及G6,三種方案各有優劣勢 |  | ...
  • 上一篇:微服務架構基本原理學習筆記(一) 三、微服務架構 從一個已有的單體架構的應用程式開始進行微服務架構的重構往往是一個不錯的選擇。隨著業務量和功能的增加,我們可以考慮使用微服務架構來擴充應用程式中原有的功能,或者每次添加新功能時,都為其創建一個新的微服務。這比從一開始就選擇使用微服務架構進行設計 ...
  • EasyExcel是一個基於Java的、快速、簡潔、解決大文件記憶體溢出的Excel處理工具。 他能讓你在不用考慮性能、記憶體的等因素的情況下,快速完成Excel的讀、寫等功能。 # 快速入門 導入依賴 ~~~xml com.alibaba easyexcel 3.1.1 ~~~ # 寫 Excel # ...
  • ### 前言 json是我們現代互聯網程式最常用的交互格式,是否你在工作中會遇到前端說欄位不一致需要改的需求,是否遇到過資料庫欄位名與javaBean的規範不同,是否遇到過json與javaBean相互轉換時因為需求寫的土匪代碼,這些都可以用Jackson完成,我們經常和json打交道,而Jacks ...
  • 在Java中,同步(Synchronous)和非同步(Asynchronous)是用來描述程式執行模式的概念。 1. 同步:同步指的是按照程式的順序依次執行代碼,每個操作都會等待前一個操作完成後再執行。同步執行的特點是阻塞,即某個操作的完成會導致後續操作的等待。在多線程編程中,同步可以通過使用鎖(如` ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...