Trăn thợ cần tây

Trong phần này, chúng ta sẽ tìm hiểu cách xây dựng các tác vụ nền có thể thực thi độc lập với phần mềm máy chủ Web

Để giúp các bạn dễ theo dõi, sau đây là danh sách các bài viết trong chuỗi bài hướng dẫn này

  • Phần 1. Chào thế giới
  • Phần 2. Tìm hiểu về mẫu
  • Phần 3. Tìm hiểu về Web Forms
  • Phần 4. Sử dụng cơ sở dữ liệu
  • Phần 5. Xử lý đăng nhập
  • Phần 6. Hồ sơ cá nhân và ảnh đại diện
  • Phần 7. Xử lý lỗi
  • Phần 8. Tạo chức năng theo dõi
  • Phần 9. Partition
  • Phần 10. Email hỗ trợ
  • Phần 11. Nâng cấp giao diện
  • Phần 12. Xử lý thời gian
  • Phần 13. Hỗ trợ đa ngôn ngữ
  • Phần 14. Sử dụng Ajax
  • Phần 15. Tinh chỉnh cấu trúc ứng dụng
  • Phần 16. Hỗ trợ tìm kiếm hỗ trợ
  • Phần 17. Triển khai ứng dụng trên Linux
  • Phần 18. Triển khai ứng dụng với Heroku
  • Phần 19. Triển khai ứng dụng với Docker
  • Phần 20. JavaScript nâng cao
  • Phần 21. Thông báo cho người sử dụng
  • Phần 22. Tìm hiểu về nhiệm vụ nền [Bài viết này]
  • Phần 23. Xây dựng API

Bạn có thể truy cập mã nguồn cho phần này tại GitHub

Phần này sẽ tập trung vào các kỹ thuật cần thiết để xây dựng các tiến trình phức tạp và yêu cầu thời gian thực hiện thi dài trong ứng dụng. Các tiến trình này không thể thi hành đồng bộ với các yêu cầu từ chương trình khách [máy khách] bởi vì quá trình thực thi chúng sẽ kéo dài và sẽ không cho phép bất kỳ câu trả lời nào từ máy chủ về phía máy khách trong suốt thời gian . Trong Phần 10, chúng ta đã có dịp tìm hiểu một chút về chủ đề này khi chuyển chức năng gửi email thành một luồng [thread] chạy ở chế độ nền [background] để người dùng không phải đợi 3-4 giây mỗi lần. . Tuy nhiên, dù giải pháp này hoạt động tốt cho nhiệm vụ gửi email, nó lại không hiệu quả lắm khi các tiến trình yêu cầu thời gian thực thi dài hơn. Trong thực tế, giải pháp được sử dụng rộng rãi hơn là đưa các nhiệm vụ có thời gian thực thi dài vào các tiến trình phục vụ [worker process]

Để minh họa cho nhu cầu về các tiến trình có thời gian thực thi dài, chúng ta sẽ tìm hiểu và xây dựng chức năng xuất tệp [export]. Chức năng này sẽ cho phép người dùng nhận được một tệp dữ liệu có chứa tất cả các bài viết của họ. Khi người dùng sử dụng chức năng này, ứng dụng sẽ khởi động tác vụ xuất tệp để tạo ra tệp JSON chứa toàn bộ bài viết của người dùng đó và gửi đến họ qua email. Tất cả các hoạt động này sẽ được diễn ra trong một tiến trình máy chủ, đồng thời hiển thị tiến độ công việc thông qua một thông báo để người dùng có thể quan sát được định mức hoàn thành theo tỷ lệ phần trăm.

Giới thiệu về task queue [Task Queue]

Nhiệm vụ hàng đợi là một giải pháp thuận lợi cho ứng dụng để thực thi một nhiệm vụ nhờ vào một quy trình phục vụ. Tiến trình phục vụ được thực thi độc lập với ứng dụng và thậm chí có thể được đặt trên một hệ thống khác. Giao tiếp giữa ứng dụng và trình phục vụ sẽ được thực hiện thông qua hàng đợi thông báo [hàng đợi tin nhắn]. Ứng dụng sẽ gửi công việc đến hàng đợi và theo dõi tiến trình công việc bằng cách tương tác với hàng đợi như sơ đồ sau

Ứng dụng biến phổ biến nhất cho hàng đợi nhiệm vụ trong Python hiện nay là Celery. Đây là một ứng dụng tương đối phức tạp với nhiều tùy chọn và hỗ trợ một số kiểu hàng đợi khác nhau. Một ứng dụng hàng đợi Python phổ biến khác là Redis Queue [thường được gọi tắt là RQ] và không hoạt động bằng Celery [như chỉ hỗ trợ hàng đợi thông báo của Redis], nhưng lại dễ sử dụng hơn rất nhiều

Cả Celery và RQ đều có thể hỗ trợ tốt cho các tác vụ nền trong ứng dụng Flask, vì vậy, chúng tôi sẽ sử dụng RQ cho ứng dụng của chúng tôi vì tính đơn giản của nó. Tuy nhiên, việc xây dựng chức năng này với Celery cũng không phải là quá khó. Nếu bạn thích sử dụng Celery hơn RQ, chúng ta có thể quay lại chủ đề này trong một bài viết khác

Use RQ

RQ là một gói Python chuẩn và có thể được cài đặt bằng pip

1

2

[myenv] $ pip3 install rq

[myenv] $ pip3 freeze > requirements.txt

Như đã nói ở trên, liên lạc giữa ứng dụng và các trình phục vụ của RQ sẽ được thực hiện thông qua một hàng đợi thông báo của Redis, vì vậy bạn phải thiết lập một máy chủ Redis. Chúng ta có nhiều tùy chọn khác nhau để cài đặt và thực hiện một máy chủ Redis như là sử dụng chương trình cài đặt hoặc tải về mã nguồn và biên dịch chúng trên hệ thống của bạn. Nếu sử dụng Windows, bạn có thể tải trình cài đặt Redis server tại đây. Trên Linux, bạn có thể cài đặt thông qua các tiện ích quản lý gói tương ứng với bản phân phối Linux mà bạn đang sử dụng. Với Mac OS X, bạn có thể sử dụng lệnh brew install redis [sau khi đã cài đặt homebrew] và khởi động lại thủ công redis bằng lệnh redis-server

Trong các hệ thống sử dụng Debian hoặc Ubuntu, bạn có thể cài đặt phần mềm Redis bằng các lệnh sau

1

2

3

$ sudo apt -y update # Cập nhật thông tin về các package trong hệ thống

$ sudo apt -y install redis-server # Cài đặt Redis Server

$ sudo systemctl enable --now redis-server.service # Khởi động và cho phép Redis Server tự khởi động khi khởi động.

Sau khi khởi động, bạn chỉ cần đảm bảo rằng máy chủ đang hoạt động và cho phép truy cập RQ, ngoài ra bạn sẽ không cần phải làm gì thêm

Cần lưu ý rằng RQ không chạy bằng trình thông dịch Python cho Windows. Nếu bạn đang sử dụng Windows, bạn cần thực thi RQ trong môi trường mô phỏng Unix. Bạn có thể làm điều này bằng một trong hai công cụ. Cygwin hoặc Hệ thống con Windows cho Linux [WSL]. Cả hai công cụ này đều tương thích với RQ

Khởi tạo một nhiệm vụ [Tác vụ]

Chúng ta sẽ bắt đầu bằng một ví dụ khởi động nhiệm vụ đơn giản trong RQ để bạn làm quen với nó. Một nhiệm vụ chỉ là một hàm Python. Sau đây là một ví dụ về nhiệm vụ và chúng ta sẽ đưa nó vào một mô-đun mới là ứng dụng/tác vụ. py

ứng dụng/nhiệm vụ. py. Ví dụ về nhiệm vụ nền

1

2

3

4

5

6

7

8

nhập thời gian

 

def ví dụ[giây]:

    in['Bắt đầu nhiệm vụ']> từ redis nhập Redisnhập Redis công việc = hàng đợi. enqueue['ứng dụng. nhiệm vụ. dụ', 30]

>>> công việc. get_id[]

'ddf1685b-3fd1-4e9b-9e4a-e1cabc58586f'

Lớp Queue trong Redis sẽ đại diện cho nhiệm vụ chờ đợi hàng trong ứng dụng. Để khởi tạo một thực thể của lớp này, chúng ta cần tham số là tên hàng đợi và một đối tượng kết nối Redis [được khởi tạo với URL mặc định trong trường hợp này]. Nếu máy chủ Redis của bạn nằm trên một máy chủ khác hoặc sử dụng một cổng khác, bạn phải cung cấp URL tương ứng

Phương thức enqueue[] của hàng sẽ thêm một nhiệm vụ vào hàng đợi. Tham số đầu tiên của phương thức này là tên của nhiệm vụ bạn muốn thực hiện và được truyền trực tiếp bằng một hàm đối tượng hoặc là một chuỗi tham chiếu. Trong phạm vi của bài viết này, việc sử dụng chuỗi tham chiếu thuận lợi hơn vì chúng ta không cần phải thực hiện hàm tham chiếu trong ứng dụng. Các tham số còn lại cho phương thức enqueue[] sẽ được truyền vào hàm được thực hiện bởi tiến trình phục vụ

Ngay khi gọi enqueue[], bạn sẽ thấy một số hoạt động trên cửa sổ lệnh đầu tiên đang chạy trình phục vụ của RQ. Bạn sẽ thấy hàm example[] được thực thi và trong bộ đếm từng giây. Cùng lúc đó, bạn vẫn có thể tiếp tục thực hiện các chỉ thị trong phiên làm việc với ứng dụng trong cửa sổ lệnh thứ hai. Trong ví dụ trên, chúng tôi gọi phương thức myblog-tasks0 để nhận được danh sách đã được giao cho nhiệm vụ. Để kiểm tra nhiệm vụ đã hoàn tất hay chưa, bạn có thể thử sử dụng một biểu thức thú vị khác với các đối tượng myblog-tasks1 như sau

1

2

>>> công việc. is_finished

Sai

Nếu bạn sử dụng giá trị 30 như trong ví dụ trên, hàm này sẽ được thực thi trong 30 giây. Sau khoảng thời gian này, biểu thức myblog-tasks2 sẽ trở thành True, đơn giản nhưng cũng rất hiệu quả

Sau khi tác vụ đã được thi hành và hoàn tất, trình phục vụ sẽ trở lại trạng thái chờ cho các tác vụ tiếp theo trong hàng đợi. Vì vậy, bạn có thể lặp lại cách gọi của phương thức enqueue[] với các tham số khác nếu bạn muốn tìm hiểu về nó. Dữ liệu cho các nhiệm vụ sẽ được lưu trong hàng đợi trong khoảng thời gian định trước [500 giây theo mặc định], nhưng sẽ bị xóa sau đó. Điều này rất quan trọng, bạn cần nhớ rằng hàng đợi sẽ không lưu lại lịch sử của các nhiệm vụ đã được thực hiện thông qua nó

Báo cáo tiến độ thực hiện nhiệm vụ

Các nhiệm vụ trong ví dụ trên quá đơn giản và không có tác dụng thực tế nào. Thường thì đối với các nhiệm vụ có thời gian thực thi kéo dài, chúng ta muốn có một số thông tin về tiến độ thực thi để hiển thị cho người dùng. RQ has supports for this request with meta properties of the task object. Dưới đây là phiên bản cập nhật của nhiệm vụ example[] để tạo báo cáo [report] về tiến độ thực thi

ứng dụng/nhiệm vụ. py. Độ thực thi của nhiệm vụ nền

1

2

3

4

5

6

7

8

9

10

11

12

13

14

nhập thời gian

từ rq nhập get_current_job

 

def ví dụ[giây]:

    công việc = get_current_job[]

    in['Bắt đầu nhiệm vụ']> công việc = hàng đợi. enqueue['ứng dụng. nhiệm vụ. dụ', 30]

>>> công việc. siêu dữ liệu

{}

>>> công việc. làm mới[]

>>> công việc. siêu dữ liệu

{'tiến trình'. 13. 043478260869565}

>>> công việc. làm mới[]

>>> công việc. siêu dữ liệu

{'tiến trình'. 69. 56521739130434}

>>> công việc. làm mới[]

>>> công việc. siêu dữ liệu

{'tiến trình'. 100}

>>> công việc. is_finished

Thật

Như bạn có thể thấy ở trên, chúng ta có thể truy cập và đọc thông tin cần thiết từ thuộc tính myblog-tasks6. Phương thức rq worker0 cần được sử dụng để cập nhật thông tin từ Redis

Tạo cơ sở dữ liệu cho các nhiệm vụ

Ví dụ trên giúp chúng ta hiểu các khái niệm cơ bản của việc thực thi nhiệm vụ và theo dõi tiến trình thực thi. Tuy nhiên, quá trình này trở nên phức tạp hơn đối với các ứng dụng Web. Lý do là vì khi một trong các nhiệm vụ này được khởi động theo yêu cầu từ phần mềm khách hàng [máy khách], yêu cầu sẽ kết thúc tại một thời điểm nào đó và khi đó, các thông tin liên quan đến yêu cầu [ . Nhưng bởi vì chúng ta cần biết mỗi tác vụ đang được thực thi xuất phát từ yêu cầu nào của người dùng, chúng ta sẽ cần sử dụng đến cơ sở dữ liệu để lưu lại các thông tin cần thiết. Sau đây là mô hình dữ liệu cho các nhiệm vụ

ứng dụng/mô hình. py. Mô hình nhiệm vụ

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

.. .

nhập redis

nhập rq

 

lớp Người dùng[UserMixin, db.Mô hình].

    . . .

    nhiệm vụ = db. mối quan hệ['Nhiệm vụ', backref='user', lazy='dynamic']

 

.. .

 

lớp Nhiệm vụ[db. Mô hình].

    id = db. Cột[db. Chuỗi[36], primary_key=True]

    tên = db. Cột[db. Chuỗi[128], index=True]

    mô tả = db. Cột[db. Chuỗi[128]]

    user_id = db. Cột[db. Số nguyên, db. Khóa ngoại['người dùng. id']]

    hoàn thành = db. Cột[db. Boolean, mặc định=Sai]

 

    def get_rq_job[self]:

        thử.

            rq_job = rq. công việc. Công việc. tìm nạp[chính mình. id, kết nối=current_app . .redis]

        ngoại trừ [redis. ngoại lệ. Lỗi Redis, rq. ngoại lệ. NoSuchJobError].

            return Không có

        return rq_job

 

    def get_progress[self]:

        công việc = bản thân. get_rq_job[]

        trả lại công việc. siêu dữ liệu. lấy['tiến độ', 0] if job is not None else 100

Một điểm khác biệt đáng lưu ý giữa mô hình này và mô hình dữ liệu chúng ta đã sử dụng trước đây là chúng ta sử dụng dữ liệu kiểu chuỗi [chuỗi] để chọn từ khóa chính rq worker1 thay vì số nguyên [số nguyên]. Lý do là vì chúng ta sẽ không sử dụng từ khóa chính do cơ sở dữ liệu tạo ra mà sẽ sử dụng id cho tác vụ do RQ tạo ra

Mô hình dữ liệu này sẽ chứa đầy đủ tên của tác vụ [tên đủ điều kiện] – cũng là tên chúng ta sẽ gửi đến RQ, đặc tả của tác vụ [mô tả] để hiển thị cho người dùng, quan hệ với người dùng đã yêu cầu . Trường rq worker2 giúp chúng tôi phân biệt giữa các nhiệm vụ đã kết thúc và các nhiệm vụ đang được thực hiện bởi vì các nhiệm vụ đang được thực hiện cần được giải thích đặc biệt để hiển thị cập nhật về tiến độ

Phương thức rq worker3 là một hàm trợ giúp để truy cập đến một công việc có thể thực hiện được của RQ với rq worker1 tương ứng của nhiệm vụ từ mô hình dữ liệu. Trong hàm trợ giúp này, chúng ta gọi hàm rq worker5 để truy cập đến thực thể rq worker6 từ dữ liệu tương ứng của Redis. Sau đó, phương thức rq worker7 sẽ gọi hàm trợ giúp này và trả về tỷ lệ hoàn thành của nhiệm vụ. Phương thức này dựa trên một số giả định như sau

  • Nếu rq worker1 của nhiệm vụ trong mô hình dữ liệu không tồn tại trong hàng đợi của RQ, điều này có nghĩa là nhiệm vụ đã hoàn thành và dữ liệu trong hàng đợi đã quá xếp hạng và đã bị xóa khỏi hàng đợi. Vì vậy, trong trường hợp này, tỷ lệ hoàn thành của nhiệm vụ là 100
  • Ở chiều ngược lại, nếu một nhiệm vụ tồn tại nhưng lại không có dữ liệu tương ứng từ tính toán myblog-tasks6, chúng ta có thể kết luận rằng nhiệm vụ sắp được đặt để được thực hiện nhưng chưa được thi hành trong thời điểm đó. . Vì vậy, tỷ lệ hoàn thành sẽ là 0 trong trường hợp này

Như thường lệ, chúng ta cần cập nhật cơ sở dữ liệu với những thay đổi này

1

2

[myenv] $ flask db migrate -m "tasks"

[myenv] $ flask db upgrade

Chúng ta cũng cần đưa mô hình dữ liệu mới này vào ngữ cảnh lệnh của ứng dụng [shell context] để có thể sử dụng nó trong một phiên làm việc từ chế độ dòng lệnh mà không cần phải tham khảo

blog của tôi. py. Add task data module and command context

1

2

3

4

5

6

7

8

9

10

từ ứng dụng nhập create_app, db, cli

từ ứng dụng. mô hình nhập Người dùng, Đăng, Message, Notification, Task

 

app = create_app[]

cli. đăng ký[ứng dụng]

 

@ứng dụng. shell_context_processor

def make_shell_context[].

    return {'db'. db, 'Người dùng'. Người dùng, 'Đăng'. Đăng, 'Thông báo'. Thông báo,

            'Thông báo'. Thông báo, 'Nhiệm vụ'. Nhiệm vụ}

Tích hợp RQ với ứng dụng Flask

Địa chỉ để kết nối với dịch vụ Redis cần được thêm vào cấu hình cho ứng dụng

1

2

3

lớp Cấu hình[đối tượng]:

    . . .

    REDIS_URL = os. môi trường. lấy['REDIS_URL'] or 'redis://'

Cũng như trước đây, địa chỉ thật để kết nối với Redis sẽ được đưa vào một biến môi trường và nếu biến môi trường này không tồn tại, chúng ta sẽ sử dụng địa chỉ kết nối mặc định với giả thuyết rằng Redis đang chạy

Redis và RQ sẽ được khởi động trong quá trình tạo ứng dụng

ứng dụng/__init. py. Tích hợp RQ

1

2

3

4

5

6

7

8

9

10

11

12

.. .

từ redis nhập Redis

nhập rq

 

.. .

 

def create_app[config_class=Config]:

    . . .

    ứng dụng. redis = Redis. from_url[ứng dụng. config['REDIS_URL']]

    ứng dụng. task_queue = rq. Hàng đợi['myblog-tasks', connection=app.redis]

 

    . . .

Trong đoạn mã trên, example[]0 sẽ là hàng đợi để nhận các nhiệm vụ được gửi đến. Sự kết nối giữa hàng đợi với ứng dụng theo cách này rất tiện lợi vì nó cho phép chúng ta truy cập hàng đợi từ bất kỳ nơi nào trong ứng dụng qua biến số example[]1. Để có thể dễ dàng gửi các nhiệm vụ đến hàng đợi hoặc kiểm tra tình trạng của họ, chúng tôi sẽ tạo thêm một vài chức năng trợ giúp trong lớp example[]2

ứng dụng/mô hình. py. Các hàm trợ giúp trong cấu hình dữ liệu người sử dụng

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

.. .

 

lớp Người dùng[UserMixin, db.Mô hình].

    . . .

 

    def launch_task[self, name, description, *args, **kwargs]:

        rq_job = current_app. task_queue. enqueue['app. tasks. ' + name, self. id,

                                                *args, **kwargs]

        task = Task[id=rq_job. get_id[], name=name, description=description,

                    user=self]

        db. session. add[task]

        return task

 

    def get_tasks_in_progress[self].

        return Task. query. filter_by[user=self, complete=False]. all[]

 

    def get_task_in_progress[self, name].

        return Task. query. filter_by[name=name, user=self,

                                    complete=False]. first[]

Phương thức example[]3 sẽ đảm nhiệm việc gởi một tác vụ đến hàng đợi RQ đồng thời đưa các dữ liệu có liên quan vào cơ sở dữ liệu. Tham số example[]4 là tên của hàm được định nghĩa trong app/tasks. py. Khi gởi một tác vụ đến hàng đợi, hàm sẽ ghép chuỗi example[]5 và tên của tác vụ để tạo thành tên đầy đủ của tác vụ [fully qualified name]. Tham số example[]6n là một mô tả về tác vụ để hiển thị cho người sử dụng. Đối với hàm xuất file cho các bài viết sẽ được thực hiện ở phần sau, chúng ta sẽ sử dụng tên example[]7 và mô tả tương ứng là example[]8. Các tham số còn lại là các tham số theo vị trí [positional] và tham số theo từ khóa [keywords] và sẽ được truyền vào tác vụ. Hàm này bắt đầu bằng việc gọi phương thức enqueue[]để gởi tác vụ đến hàng đợi. Sau khi phương thức enqueue[] hoàn thành, nó sẽ trả về một đối tượng tác vụ có chứa rq worker1 đã được RQ gán cho tác vụ, nhờ đó chúng ta có thể tạo ra dữ liệu cho đối tượng Queue2 tương ứng trong cơ sở dữ liệu

Lưu ý rằng hàm example[]3 thêm một đối tượng tác vụ mới vào phiên làm việc nhưng không gọi lệnh Queue4 để lưu dữ liệu tác vụ. Nhìn chung, cách tốt nhất để làm việc với một phiên làm việc của cơ sở dữ liệu là ở các hàm ở mức độ cao hơn để có thể kết hợp nhiều cập nhật từ các hàm ở mức độ thấp hơn vào một giao dịch [transaction] cơ sở dữ liệu duy nhất. Tuy nhiên, đây cũng không phải một quy định bắt buộc, và trong thực tế thì bạn sẽ thấy một ngoại lệ khi chúng ta thực hiện một lệnh lưu dữ liệu trong một hàm con ở phần sau của bài viết này

Phương thức Queue5 sẽ trả về một danh sách đầy đủ các hàm tương ứng với một user đang ở trong hàng đợi. Trong phần sau, chúng ta sẽ sử dụng phương thức này để hiển thị các thông tin về các hàm đang được thực thi cho người dùng

Và cuối cùng, phương thức Queue6 là một phiên bản đơn giản hơn để trả về một tác vụ được chỉ định. Chúng ta không cho phép người dùng thực hiện đồng thời hai nhiệm vụ cùng loại, vì vậy, trước khi chúng ta bắt đầu một nhiệm vụ, chúng ta có thể sử dụng phương thức này để kiểm tra xem có nhiệm vụ cùng loại nào không

Gởi email từ các tác vụ RQ

Như đã đề cập từ đầu bài viết, khi tác vụ xuất file hoàn tất trong chế độ nền, user sẽ nhận được một email với một file đính kèm có chứa toàn bộ các bài viết của họ theo định dạng JSON. Để làm được điều này, chúng ta cần mở rộng chức năng gởi email đã được thực hiện trong Phần 10 ở hai phương diện. Thứ nhất, chúng ta cần hỗ trợ cho chức năng đính kèm file [attachment] để có thể gởi kèm file JSON. Thứ hai, hàm Queue7 luôn thực hiện việc gởi email bất đồng bộ bằng cách sử dụng một luồng ở chế độ nền. Vì chúng ta đã có các tác vụ xuất file thực hiện trong chế độ nền trong một luồng riêng, việc có thêm một luồng khác chỉ để gởi email không hợp lý. Do đó, chúng ta cần hỗ trợ cả hai quá trình gởi email đồng bộ và bất đồng bộ

Vì Flask-Mail có hỗ trợ cho chức năng đính kèm file, chúng ta chỉ cần sửa lại hàm Queue7 để nhận thêm một tham số mới cho file đính kèm và sau đó thiết lập chúng trong đối tượng Queue9. Và để chuyển đổi giữa việc gởi email trong chế độ nền hoặc trong chế độ nổi [foreground] của ứng dụng, chúng ta sẽ thêm tham số Redis0 như sau

app/email. py. Gởi email với file đính kèm

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

.. .

 

def send_email[subject, sender, recipients, text_body, html_body,

               attachments=None, sync=False].

    msg = Message[subject, sender=sender, recipients=recipients]

    msg. body = text_body

    msg. html = html_body

    if attachments.

        for attachment in attachments.

            msg. attach[*attachment]

    if sync.

        mail. send[msg]

    else.

        Chủ đề[mục tiêu=send_async_email,

            args=[current_app._get_current_object[], tin nhắn . ]].bắt đầu[]

Phương thức Redis1 trong lớp Massage sẽ nhận ba tham số liên quan đến file đính kèm. file name, file type and data in file. Tên tệp chỉ là một tên gọi để hiển thị cho người sử dụng và không cần thiết phải là một tệp thực sự. Kiểu định nghĩa tệp thuộc loại tệp nào để các chương trình nhận email có thể xử lý hợp lý. Ví dụ như nếu bạn gửi một tệp có kiểu là Redis2, chương trình nhận email sẽ biết đó là một tệp ảnh và hiển thị hình ảnh tương ứng cho người nhận. Bởi vì chúng tôi sẽ xuất bản bài viết trong tệp theo định dạng JSON, chúng tôi sẽ sử dụng tệp kiểu là Redis3. Tham số thứ ba và cũng là tham số cuối cùng sẽ là một chuỗi ký tự hoặc byte với nội dung là dữ liệu trong tệp

Để đơn giản hóa, tham số Redis4 trong hàm Queue7 sẽ là một danh sách các bộ dữ liệu [danh sách tĩnh – tương tự như danh sách nhưng không thay đổi được sau khi khởi tạo]. Mỗi bộ sẽ bao gồm ba phần tử tương ứng với ba tham số của hàm Redis1. Với cách thiết lập này, chúng ta sẽ truyền từng phần tử trong danh sách dưới dạng tuple cho hàm Redis1 thay thì ba tham số riêng rẽ. Trong Python, nếu bạn có một danh sách hoặc là một tuple với các tham số bạn muốn truyền cho một hàm, bạn có thể sử dụng cú pháp Redis8 để trình dịch Python tự động tạo ra các tham số tương ứng thay vì sử dụng cú pháp truyền thống . Ví dụ như nếu bạn có danh sách enqueue[]0, cách gọi hàm mới này sẽ truyền hai tham số giống như khi bạn gọi hàm với biểu thức enqueue[]1. Nếu không có ký tự enqueue[]2, hàm sẽ được gọi với một tham số đơn là danh sách được truyền vào

Và để thực hiện công việc gửi email đồng bộ, chúng ta cần gọi hàm enqueue[]3 khi giá trị của Redis0 là enqueue[]5

Các hàm trợ giúp

Nhiệm vụ example[] trong ví dụ trên chỉ là một chức năng độc lập và đơn giản, nhưng nhiệm vụ để xuất các bài viết ra tệp sẽ cần sử dụng đến các chức năng trong ứng dụng như truy cập cơ sở dữ liệu và gửi email. Bởi vì nhiệm vụ này sẽ được thực hiện trong một tiến trình riêng, chúng ta cần khởi động tạo Flask-SQLAlchemy và Flask-Mail, và các nhiệm vụ này sẽ cần sử dụng cấu hình thông tin từ các ứng dụng có thể thực hiện được. Vì vậy, chúng tôi sẽ bổ sung một ứng dụng Flask thực và ngữ cảnh ứng dụng vào ứng dụng/tác vụ mô-đun đầu tiên. py

ứng dụng/nhiệm vụ. py. Create application and context

1

2

3

4

từ ứng dụng nhập create_app

 

app = create_app[]

ứng dụng. app_context[]. đẩy[]

Thực hiện ứng dụng được tạo ra trong mô-đun này bởi vì đây là mô-đun duy nhất được các trình phục vụ RQ sẽ tham khảo để tham khảo. When you use enqueue[]7, module myblog. py trong thư mục gốc sẽ tạo ra khả năng ứng dụng thực sự, nhưng các tiến trình phục vụ của RQ hoàn toàn không biết về điều này. Vì vậy, chúng tôi sẽ cần tạo ra khả năng ứng dụng riêng cho các nhiệm vụ cần đến nó. Bạn đã thấy phương thức enqueue[]8 trong một vài nơi khác nhau trước đây và sử dụng phương thức enqueue[]9 với ngữ cảnh ứng dụng sẽ làm cho ứng dụng trở thành ứng dụng có thể thực hiện được, đồng thời cho phép các thư viện mở rộng như là . Nếu không có ngữ cảnh ứng dụng, biểu thức enqueue[]1 sẽ thông báo lỗi

Chúng ta sẽ tìm giải pháp để có thể tạo báo cáo tiến độ khi nhiệm vụ đang chạy. Ngoài việc truyền các thông tin tiến độ thông qua từ điển enqueue[]2, chúng tôi cũng muốn gửi các thông báo với các thông tin này đến trình duyệt để cập nhật và hiển thị cho người sử dụng bằng kỹ thuật AJAX. Chúng ta sẽ thực hiện điều này với cơ chế thông báo đã được xây dựng trong Phần 21. Các bản cập nhật sẽ được thực hiện theo cách tương tự như ô thông báo về số tin nhắn chưa được đọc. Khi máy chủ bắt đầu sử dụng một mẫu, nó sẽ gửi thông tin về tiến độ “tĩnh” nhận được từ enqueue[]2 vào đó. Nhưng khi trang Web đã được tải về trình duyệt của người sử dụng, hệ thống thông báo sẽ cập nhật tỉ lệ hoàn thành của các tác vụ. Và để làm việc với hệ thống thông báo, chúng ta cần làm nhiều hơn so với ví dụ trên. Đầu tiên, chúng ta sẽ tạo ra một hàm đóng gói [wrapper] cho việc cập nhật tiến độ

app/tasks. py. Thiết lập tiến độ của tác vụ

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

từ rq nhập get_current_job

from app import db

from app. models import Task

 

.. .

 

def _set_task_progress[progress].

    công việc = get_current_job[]

    if job.

        job. meta['progress'] = progress

        công việc. save_meta[]

        task = Task. query. get[job. get_id[]]

        task. user. add_notification['task_progress', {'task_id'. job. get_id[],

                                                     'progress'. progress}]

        if progress >= 100.

            task. complete = True

        db. session. commit[]

Tác vụ xuất file có thể gọi hàm enqueue[]4 để ghi nhận tỉ lệ hoàn thành. Hàm này sẽ ghi tỉ lệ phần trăm vào từ điển enqueue[]2 và lưu vào Redis. Sau đó, nó sẽ tiến hành nạp tác vụ tương ứng từ cơ sở dữ liệu và phương thức enqueue[]6 có sẵn để tạo ra một thông báo đến enqueue[]7 cũng là user đã yêu cầu tác vụ. Thông báo này sẽ được đặt tên là enqueue[]8 và sẽ có dữ liệu là một từ điển bao gồm hai đối tượng. định danh tác vụ và tỉ lệ phần trăm. Chúng ta sẽ thêm mã JavaScrip để xử lý kiểu thông báo mới này trong phần sau

Hàm sẽ kiểm tra tiến độ của tác vụ. Nếu tác vụ đã hoàn thành, nó sẽ cập nhật thuộc tính rq worker2 của đối tượng tác vụ trong cơ sở dữ liệu. Cuối cùng, lệnh commit được thực hiện để bảo đảm rằng các đối tượng tác vụ và thông báo cho người sử dụng vừa được tạo ra qua hàm enqueue[]6được lưu vào cơ sở dữ liệu. Chúng ta cần phải rất cẩn thận với cách thiết kế nhiệm vụ cha để tránh những thay đổi trong cơ sở dữ liệu bởi vì lệnh cam kết cũng sẽ lưu lại những thay đổi này nếu có

Xây dựng tác vụ xuất file

Đến đây, chúng ta đã có tất cả các thành phần cần thiết để xây dựng hàm xuất file. Cấu trúc của hàm này như sau

app/tasks. py. Cấu trúc tổng quát của hàm xuất file

1

2

3

4

5

6

def export_posts[user_id].

    try.

        # Đọc các bài viết của user từ cơ sở dữ liệu

        # Gởi email có các bài viết đến user

    except.

        # Xử lý lỗi

Tại sao chúng ta phải sử dụng khối try/except block như trên? Mã ứng dụng trong các hàm xử lý yêu cầu không bị ảnh hưởng bởi các lỗi phát sinh ngoài dự kiến vì Flask sẽ tự động phát hiện và xử lý các lỗi này, đồng thời ghi nhận các tình huống lỗi trong các file nhật ký mà chúng ta đã thiết lập cho ứng dụng. Tuy nhiên, hàm này sẽ được thực thi trong một tiến trình riêng do RQ điều khiển chứ không phải Flask. Vì vậy nếu có một lỗi ngoài dự kiến xảy ra, tác vụ sẽ bị hủy bỏ, RQ sẽ hiển thị thông báo lỗi trên cửa sổ lệnh và trở lại chế độ chờ để đợi tác vụ mới. Vì vậy, nếu bạn không kiểm tra các thông tin trạng thái của các tiến trình phục vụ trong RQ hoặc đưa chúng vào một hệ thống nhật ký, bạn sẽ không bao giờ biết về các lỗi đã xảy ra

Chúng ta sẽ đi vào chi tiết trong ba phần được chú thích trong đoạn mã trên, bắt đầu với phần đơn giản nhất là phần xử lý lỗi nằm ở cuối cùng

app/tasks. py. Xử lý lỗi khi xuất file

1

2

3

4

5

6

7

8

9

import sys

.. .

 

def export_posts[user_id].

    try.

        . . .

    except.

        _set_task_progress[100]

        app. logger. error['Unhandled exception', exc_info=sys. exc_info[]]

Khi lỗi xảy ra, chúng ta sẽ đánh dấu tác vụ là đã hoàn thành bằng cách thiết lập tiến độ là 100%, và sau đó lưu lại các thông tin liên quan vào hệ thống nhật ký, bao gồm thông báo lỗi, strack trace, và các thông tin được trả về từ hàm enqueue[]1. Trong đoạn mã này, với việc sử dụng nhật ký ứng dụng của Flask, chúng ta cũng thấy tác dụng của hệ thống nhật ký đã được xây dựng trước đây. Ví dụ như trong Phần 7, chúng ta đã thiết lập cấu hình để ứng dụng gởi các thông báo lỗi đến người quản trị qua email. Khi sử dụng enqueue[]2, chúng ta cũng có thể áp dụng cách làm này cho các lỗi trong tác vụ xuất file

Tiếp theo, chúng ta sẽ viết mã nguồn để xuất file. Về bản chất, công việc này thật ra là thực hiện một truy vấn đến cơ sở dữ liệu, duyệt kết quả truy vấn và đưa dữ liệu tương ứng vào một từ điển

app/tasks. py. Đọc các bài viết của user từ cơ sở dữ liệu

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

nhập thời gian

from app. models import User, Post

 

.. .

 

def export_posts[user_id].

    try.

        user = User. query. get[user_id]

        _set_task_progress[0]

        data = []

        i = 0

        total_posts = user. posts. count[]

        for post in user. bài đăng. order_by[Post. timestamp. asc[]].

            data. append[{'body'. post. body,

                         'dấu thời gian'. post. dấu thời gian. isoformat[] + 'Z'}]

            time. sleep[5]

            i += 1

            _set_task_progress[100 * i // total_posts]

 

        # Gởi email có các bài viết đến user

    except.

        . . .

Với mỗi bài viết, từ điển trên sẽ tạo ra một từ mới với hai phần từ, văn bản trong bài viết và thời gian bài viết được đăng. Thòi gian đăng bài sẽ theo chuẩn ISO 8601. Đối tượng enqueue[]3 trong Python mà chúng tôi sử dụng không chứa thông tin về múi giờ [múi giờ], vì vậy chúng tôi sẽ thêm ký tự 'Z' phía sau để chỉ ra rằng đây là thời gian theo UTC

Phần mã tiếp theo phức tạp hơn vì chúng ta cần theo dõi tiến độ. Chúng ta sẽ sử dụng biến đếm enqueue[]4 và thực hiện một truy vấn dữ liệu khác trước khi khởi tạo vòng lặp để lấy tổng số bài viết và gán cho biến enqueue[]5. Trong mỗi lần lặp lại, chúng ta có thể cập nhật tiến độ hoàn thành nhiệm vụ với một số trong khoảng từ 0 đến 100 bằng cách sử dụng enqueue[]4 kết hợp với enqueue[]5

Bên cạnh đó, chúng tôi cũng gọi hàm enqueue[]8 trong mỗi lần lặp để kéo dài thời gian thực hiện nhiệm vụ và giúp chúng tôi có thể thấy được tiến trình rõ ràng hơn dù khi nhiệm vụ này chỉ xuất ra một số ít bài viết

Và sau đây là phần cuối cùng của hàm này để gửi email đến người dùng với tất cả các thông tin trong biến enqueue[]9 dưới dạng tệp đính kèm

ứng dụng/nhiệm vụ. py. Gửi email có bài viết cho người dùng

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

nhập json

từ flask nhập render_template

từ ứng dụng. email nhập send_email

 

#

 

def export_posts[user_id].

    try.

        #.

 

        send_email['[Myblog] Bài đăng trên blog của bạn',

                        người gửi=ứng dụng. config['QUẢN TRỊ'][0], recipients=[user.email],

                        text_body=render_template['email/export_posts.txt', người dùng=người dùng],

                        html_body=render_template['email/export_posts.html', người dùng=người dùng],

                        tệp đính kèm=[['posts.json', 'application/json',

                              json. kết xuất[{'bài đăng': dữ liệu}, thụt lề=4]]],

                        đồng bộ hóa=True]

    except.

        #.

Ở đây, công việc của họ chỉ đơn giản là gọi hàm Queue7. Tệp đính kèm được định nghĩa là một tuple với ba phần tử sẽ được truyền theo phương thức Redis1 của các đối tượng thay đổi Queue9 trong thư viện Flask-Mail. Phần tử thứ ba của tuple này là nội dung của tệp đính kèm và được tạo bởi hàm example[]3 của Python

Chúng ta cũng sử dụng hai mẫu mới trong đoạn mã trên nội dung của email theo định dạng văn bản thông thường và HTML. Sau đây là nội dung của các email mẫu dưới dạng văn bản

ứng dụng/mẫu/email/export_posts. txt. Mẫu cho email xuất tệp định dạng văn bản thông thường

1

2

3

4

5

Kính gửi {{ người dùng. tên người dùng }},

 

Vui lòng tìm được đính kèm các kho lưu trữ of your posts that you requested.

 

Trân trọng,

And template email format HTML

ứng dụng/mẫu/email/export_posts. html. Mẫu cho email xuất tệp định dạng HTML

1

2

3

Kính gửi người dùng {{. tên người dùng }},

Vui lòng tìm tệp đính kèm lưu trữ các bài đăng mà bạn đã yêu cầu.

Trân trọng,

Chức năng xuất tệp trong ứng dụng

Chúng ta đã chuẩn bị xong tất cả các thành phần chính để hỗ trợ tác vụ xuất tệp. Công việc còn lại chỉ là kết nối chức năng này với ứng dụng để người dùng có thể gửi tệp xuất yêu cầu và nhận được email có bài viết của họ

Sau đây là hàm xử lý mới example[]4 cho chức năng này

ứng dụng/chính/tuyến đường. py. Hàm xử lý chức năng xuất file

1

2

3

4

5

6

7

8

9

@bp. tuyến đường['/export_posts']

@login_required

def export_posts[].

    if current_user. get_task_in_progress['export_posts']:

        flash[_['An export task is currently in progress']]

    else.

        current_user. launch_task['export_posts', _['Exporting posts. ']]

        db. session. commit[]

    return redirect[url_for['main. user', username=current_user. username]]

Hàm sẽ kiểm tra xem user có tác vụ xuất file nào trong hàng đợi hay không, và nếu có nó sẽ hiển thị một thông điệp thay vì bắt đầu một tác vụ mới vì chúng ta muốn tránh tình trạng một user có thể thực hiện nhiều tác vụ xuất file cùng lúc. Chúng ta kiểm tra điều kiện này bằng cách sử dụng phương thức Queue6 đã được tạo ra ở phần trước

Nếu user hiện không có tác vụ xuất file nào, hàm sẽ bắt đầu một tác vụ mới bằng cách gọi hàm example[]3. Tham số đầu tiên cho hàm này là tên của hàm sẽ được gởi đến tiến trình phục vụ RQ và được bắt đầu với chuỗi example[]5 Tham số thứ hai là một chuỗi mô tả về tác vụ sẽ được hiển thị với người sử dụng. Cả hai giá trị này sẽ được lưu vào đối tượng Queue2 trong cơ sở dữ liệu. Tiếp theo, hàm sẽ định hướng user đến trang hồ sơ cá nhân và kết thúc

Chúng ta cũng cần tạo ra một liên kết để user có thể yêu cầu xuất file. Nơi thích hợp nhất cho liên kết này là trong trang hồ sơ cá nhân, bên dưới liên kết “Edit your profile” và chỉ được hiển thị khi nào user xem chính trang hồ sơ cá nhân của họ

app/templates/user. html. Liên kết xuất file trong trang hồ sơ cá nhân

1

2

3

4

5

6

7

8

9

10

11

12

13

14

                . . .

                

                    

                        {{ _['Edit your profile'] }}

                    

                

                {% if not current_user. get_task_in_progress['export_posts'] %}

                

                    

                        {{ _['Export your posts'] }}

                    

                

                . . .

                {% endif %}

Ngoài ra, liên kết này cũng không được hiển thị nếu một user đang thực hiện tác vụ xuất file. Đó là lý do chúng ta có sử dụng điều kiện example[]9 trong template trên

Tại thời điểm này, chức năng xuất file đã hoạt động nhưng không trình bày kết quả cho user. Nếu muốn thử nghiệm, bạn có thể khởi động ứng dụng và tiến trình phục vụ RQ như sau

  • Khởi động Redis
  • Trong một cửa sổ lệnh, khởi động một hoặc nhiều tiến trình phục vụ RQ với lệnh myblog-tasks00
  • Trong một cửa sổ lệnh khác, khởi động ứng dụng Flask với lệnh myblog-tasks01 [và đừng quên thiết lập biến môi trường myblog-tasks02 trước khi nhập lệnh này]
Thông báo tiến độ

Để hoàn thành chức năng này, chúng ta muốn thông báo cho người sử dụng khi một tác vụ nền đang được thi hành và tiến độ theo tỉ lệ phần trăm. Để phục vụ cho mục đích này, chúng ta sẽ sử dụng một thành phần của Bootstrap được gọi là cảnh báo [alert]. Thành phần cảnh báo trong Bootstrap là một khối chữ nhật theo chiều ngang của màn hình và hiển thị một số thông tin cho người dùng. Trước đây, chúng ta đã sử dụng các khối cảnh báo này với màu xanh dương để hiển thị các thông báo cho người dùng. Trong phần này, chúng ta sẽ sử dụng khối cảnh báo với màu lá cây xanh để hiển thị trạng thái của tác vụ xuất tệp và đặt nó bên dưới thanh định hướng như hình dưới đây

ứng dụng/mẫu/cơ sở. html. Warning for export file process in template root

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

.. .

{% chặn nội dung %}

    

Chủ Đề