Mẹo Hướng dẫn Hướng dẫn celery python example 2022
Bạn đang tìm kiếm từ khóa Hướng dẫn celery python example được Update vào lúc : 2022-09-09 09:35:24 . Với phương châm chia sẻ Thủ Thuật về trong nội dung bài viết một cách Chi Tiết 2022. Nếu sau khi đọc nội dung bài viết vẫn ko hiểu thì hoàn toàn có thể lại Comments ở cuối bài để Admin lý giải và hướng dẫn lại nha.
Trong dự án công trình bất Động sản hiện tại của tớ khi tới phần scaling khối mạng lưới hệ thống thì kiến trúc hiện tại theo phía microservice gặp phải một yếu tố: mọi service trong khối mạng lưới hệ thống đều tương tác trực tiếp với database nên xẩy ra yếu tố càng nhiều service thì sẽ càng nhiều link tới database dẫn đến tình trạng xẩy ra deadlock, performance cũng rất chậm do những link tới database từ những service phải chờ nhau giải phóng.
Nội dung chính
- Về CeleryCác bài toán nên sử dụng CeleryCác hiệu suất cao chính Celery cung cấpCơ chế của CeleryCác module chính của CeleryApplicationSử dụng CeleryCài đặtSử dụngLưu kết quảCấu
hình CeleryTổng kếtTham khảo
Sau khi được gợi ý về việc chuyển sang dùng hàng đợi thay vì để những service thao tác trực
tiếp với database, mình có dành thời hạn tìm hiểu thêm về kiến trúc Queue. Do dự án công trình bất Động sản chạy hầu hết bằng python nên tech lead gợi ý sử dụng Celery, một khối mạng lưới hệ thống quản trị và vận hành queue phổ cập.
Kiến trúc sau khi chuyển sang sử dụng queue trong khối mạng lưới hệ thống của tớ sẽ như sau. Một nội dung bài viết khá rõ ràng về một dạng thiết kế queue là message queue mọi người hoàn toàn có thể đọc thêm ở toidicodedao
Về Celery
- Là một khối mạng lưới hệ thống quản trị và vận hành hàng đợi xử lý task thời hạn thực. Trong khối mạng lưới hệ thống Celery toàn bộ chúng ta sẽ sử dụng khái niệm task in như job ở một số trong những framework khác ví như Sidekiq.Input của celery cần link với một loại message broker còn output hoàn toàn có thể link tới một khối mạng lưới hệ thống backend để tàng trữ kết quả
Mọi người hoàn toàn có thể tìm hiểu thêm một bài
viết khác về Celery trên viblo ở đây. Ngoài ra Celery cũng luôn có thể có một khối mạng lưới hệ thống document rõ ràng và dễ đọc ở trang chủ ://docs.celeryproject.org/en/latest/getting-started/introduction.html.
Các bài toán nên sử dụng Celery
- Chạy background jobsChạy những job lập lịchTính toán
phân tánXử lý tuy nhiên tuy nhiên
Các hiệu suất cao chính Celery phục vụ
- Monitor: giám sát những job/task được đưa vào queueScheduling: chạy những task lập lịch (giống cronjob)Workflows: tạo một luồng xử lý taskTime & Rate Limits: trấn áp số lượng task được thực thi trong một khoảng chừng thời hạn, thời hạn một task được chạy,…Resource Leak Protection: trấn áp tài nguyên trong quy trình xử lý taskUser Component: được cho phép người
dùng tự customize những worker.
Cơ chế của Celery
- Celery hoạt động và sinh hoạt giải trí nhờ vào khái niệm task queue. Đây là cơ chế queue dùng để điều phối những job/work Một trong những máy rất khác nhau. Các worker sẽ nhận task, chạy task và trả về kết quả.Input của queue:
- Task
Các process trên từng worker sẽ theo dõi queue để thực thi những task mới được đẩy vào queueCelery thường dùng một message broker để điều phối task Một trong những clients và worker. Để tạo một
task mới client sẽ thêm một message vào queue, broker tiếp theo này sẽ chuyển message này tới worker. Celery tương hỗ 3 loại broker:
- RabbitMQRedisSQS
Một khối mạng lưới hệ thống sử dụng celery hoàn toàn có thể có nhiều workers và brokers, nhờ vậy việc scale theo chiều ngang sẽ rất thuận tiện và đơn thuần và giản dị.
Các module chính của Celery
Application
Một instance được khởi tạo từ thư viện Celery được gọi là application
Nhiều Celery application hoàn toàn có thể cùng
tồn tại trong một process
Khởi tạo một celery application:
from celery import Celery
app = Celery()
Khi gửi một message tới queue, message này sẽ chỉ chứa tên của task cần thực thi.
Các celery worker sẽ map giữa tên của task với hàm thực thi task đó, việc mapping như vậy được gọi là task registry
@app.task
def add(x, y):
return x + y
Tasks
- Task trong Celery có hai trách nhiệm chính:
- định nghĩa những gì sẽ xẩy ra sau khi một task được gọi (gửi đi message)định nghĩa những gì sẽ
xẩy ra khi một worker nhận được message đó
Mỗi task có một tên riêng không trùng lặp, tên này sẽ tiến hành refer trong message để worker hoàn toàn có thể tìm kiếm được đúng hàm để thực thi. Nếu không định nghĩa tên cho task thì task này sẽ tiến hành tự đặt tên nhờ vào module mà task được định nghĩa và tên function của task.Các message của task sẽ không còn biến thành xóa khỏi queue chừng nào message đó không được một worker xử lý. Một worker hoàn toàn có thể xử lý nhiều message, nếu worker bị crash mà chưa xử lý hết
những message đó thì chúng vẫn hoàn toàn có thể được gửi lại tới một worker khácCác function của task nên ở trạng thái idempotent: function không khiến ra ảnh hưởng gì kể cả khi có bị gọi nhiều lần với cùng một tham số => một task đã thực thi sẽ đảm bảo không biến thành chạy lại lần nữa.
Tạo task
Để tạo task toàn bộ chúng ta dùng decorator @task
from models import User
@app.task(name=’create_new_user’)
def create_user(username, password):
User.objects.create(username=username, password=password)
Để task hoàn toàn có thể retry toàn bộ chúng ta hoàn toàn có thể bound task vào chính instance của nó
@task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
Task cũng hoàn toàn có thể kế
thừa
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(‘0!r failed: 1!r’.format(task_id, exc))
@task(base=MyTask)
def add(x, y):
raise KeyError()
Để biết thêm thông tin và trạng thái của task toàn bộ chúng ta hoàn toàn có thể sử dụng Task.request
@app.task(bind=True)
def dump_context(self, x, y):
print(‘Executing task id 0.id, args: 0.args!r kwargs: 0.kwargs!r’.format(
self.request))
Celery quản trị và vận hành trạng thái của tasks và hoàn toàn có thể lưu chúng trong những khối mạng lưới hệ thống gọi là result backend. Vòng đời mặc định của task trong Celery gồm:
PENDING: task đợi được thực thi.
STARTED: task đã khởi chạy
SUCCESS: task đã chạy thành công xuất sắc
FAILURE: task gặp lỗi sau khi khởi chạy
RETRY: task đang rất được
chạy lại
REVOKED: task được tịch thu lại
Ngoài những trạng thái mặc định trên toàn bộ chúng ta hoàn toàn có thể tự định nghĩa thêm trạng thái và update trạng thái cho task bằng method update_state
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state=’PROGRESS’,
meta=’current’: i, ‘total’: len(filenames))
Gọi task
Celery phục vụ những API để gọi task sau khi đã định nghĩa chúng ở trên.
3 method chính:
- apply_async: gửi task message.delay: gửi task messagecalling: task message sẽ không còn được gửi đi tới
worker mà task sẽ tiến hành thực thi luôn bởi process hiện tại.
Có một task như sau:
@app.task
def add(x, y):
return x + y
Để gọi task này toàn bộ chúng ta sẽ thử dùng 2 method là apply_async và delay
- Với delay toàn bộ chúng ta sẽ viết như sau:
# task.delay(arg1, arg2, kwarg1=’x’, kwarg2=’y’)
add.delay(10, 5)
add.delay(a=10, b=5)
- Dùng apply_async thì phải viết phức tạp hơn một chút ít
# task.apply_async(args=[arg1, arg2], kwargs=’kwarg1′: ‘x’, ‘kwarg2’: ‘y’)
add.apply_async(queue=’low_priority’, args=(10, 5))
add.apply_async(queue=’high_priority’, kwargs=’a’: 10, ‘b’: 5)
Về bản chất delay và apply_async là như nhau nhưng delay đã có sẵn những thiết lập mặc định và toàn bộ chúng ta chỉ hoàn toàn có thể truyền vào những tham số bắt buộc đã định
nghĩa trong function của task, còn với apply_async toàn bộ chúng ta hoàn toàn có thể truyền thêm những tham số khác ví như queue toàn bộ chúng ta muốn gửi message vào,…. Best practice là nên sử dụng apply_async để tiện việc config chạy task tùy từng nhu yếu sử dụng.
Celery tương hỗ việc gọi task theo như hình thức chaining, kết quả của task này hoàn toàn có thể được truyền vào task tiếp theo
add.apply_async((2, 2), link=add.s(16)) # 20
- Nhờ vào cơ chế này toàn bộ chúng ta hoàn toàn có thể thiết kế callback cho task như sau
@app.task
def error_handler(uuid):
result = AsyncResult(uuid)
exc = result.get(propagate=False)
print(‘Task 0 raised exception: 1!rn2!r’.format(
uuid, exc, result.traceback))
add.apply_async((2, 2), link_error=error_handler.s())
Sử dụng Celery
Cài đặt
pip install -U Celery
Sử dụng
Lựa chọn loại message broker phù phù thích hợp với dự án công trình bất Động sản. Như đã nói ở trên Celery tương hỗ 3 loại message broker là RabbitMQ, Redis, SQS. Mình sẽ đi sâu vào phân tích từng loại message broker trong phần sau về Celery.
Tạo một celery worker với task add
from celery Import Celery
app = Celery(‘name of module’, broker=”url_of_broker”)
@app.task
def add(x, y):
return x + y
Chạy worker
$ celery -A tasks worker –loglevel=info
Gọi task
>>> from tasks import add
>>> add.delay(4, 4)
Lưu kết quả
Celery hoàn toàn có thể lưu lại trạng thái của tasks nếu toàn bộ chúng ta cần theo dõi
tasks sau này. Với những khối mạng lưới hệ thống thực thi task theo phương thức state machine thì việc khối mạng lưới hệ thống cần nắm được luồng trạng thái của task là vô cùng quan trọng.
Các khối mạng lưới hệ thống celery dùng để lưu trạng thái task:
- SQLAlchemyMemcachedRedis
Để sử dụng cơ chế lưu kết quả trong Celery toàn bộ chúng ta khai báo celery worker có tham số backend. Ở đây mình sử dụng redis cho toàn bộ việc lưu kết quả task lẫn làm message broker
app = Celery(‘tasks’, backend=’redis://localhost’, broker=”redis://localhost:6379/0″)
Cấu
hình Celery
Cấu hình mặc định cơ bản của celery:
## Broker settings.
broker_url=”redis://localhost:6379/0″
# List of modules to import when the Celery worker starts.
imports = (‘myapp.tasks’,)
## Using the database to store task state and results.
result_backend = ‘db+sqlite:///results.db’
task_annotations = ‘tasks.add’: ‘rate_limit’: ’10/s’
Best practice: tạo một file config riêng cho celery celeryconfig.py
broker_url=”redis://localhost:6379/0://”
result_backend = ‘rpc://’
task_serializer=”json”
result_serializer=”json”
accept_content = [‘json’]
timezone=”Europe/Oslo”
enable_utc = True
task_routes =
‘tasks.add’: ‘low-priority’,
# routing một task tới queue mong ước
Ngoài cách tạo file config trên ra toàn bộ chúng ta cũng hoàn toàn có thể config trực tiếp bằng application của Celery app.conf
app.conf.update(enable_utc=True, timezone=”Europe/London”,)
Tổng kết
Celery tránh việc phải config nhiều mà chỉ việc import từ module sử dụng trực tiếp như sau
from celery Import Celery
app = Celery(‘name of module’, broker=”url_of_broker”)
Worker và client của Celery hoàn toàn có thể tự retry
Một
process của Celery hoàn toàn có thể xử lý hàng triệu task trong một phút với độ trễ chỉ vài miligiây
Celery tương hỗ:
- Message brokers:
- RabbitMQRedisSQS
Xử lý concurrency
- multiprocessingmultithreadsingle threadeventlet, gevent
Lưu trữ kết quả trên những khối mạng lưới hệ thống:
- AmqpRedisMemcachedSQLAlchemyAmazon S3File system
Serialization
- jsonyaml
Ở phần sau nội dung bài viết mình sẽ đi sâu hơn về worker trong Celery và hai loại message broker mà Celery tương hỗ: SQS – Redis, đồng thời dựng một ứng dụng cơ bản sử dụng khối mạng lưới hệ thống này.
Tham khảo
://docs.celeryproject.org/en/latest/://viblo.asia/p./gioi-thieu-celery-maGK7mvBlj2https://pawelzny/python/celery/2022/08/14/celery-4-tasks-best-practices/ Tải thêm tài liệu liên quan đến nội dung bài viết Hướng dẫn celery python example
Reply
0
0
Chia sẻ
Video Hướng dẫn celery python example ?
Bạn vừa đọc nội dung bài viết Với Một số hướng dẫn một cách rõ ràng hơn về Review Hướng dẫn celery python example tiên tiến và phát triển nhất
Chia Sẻ Link Tải Hướng dẫn celery python example miễn phí
Quý khách đang tìm một số trong những Chia Sẻ Link Down Hướng dẫn celery python example miễn phí.
Giải đáp vướng mắc về Hướng dẫn celery python example
Nếu Pro sau khi đọc nội dung bài viết Hướng dẫn celery python example , bạn vẫn chưa hiểu thì hoàn toàn có thể lại phản hồi ở cuối bài để Mình lý giải và hướng dẫn lại nha
#Hướng #dẫn #celery #python