Tuts: Laravel queues , cách hoạt động và thực hiện bằng Amazon SQS (Phần 1)

Spread the love

PHP không phải là một ngôn ngữ hỗ trợ Asynchronous nhiều vì vậy đôi khi những tác vụ gửi mail thường làm cho trải nghiệm người dùng bị down xuống

(Vì phải chờ gửi mail xong mới có thể reload trang hoặc redirect sang trang mới. Tuy nhiên việc gửi mail thì khá là vô chừng, có khi gửi nhanh mà cũng có khi chậm, lúc nắng lúc mưa)

Bối cảnh


Giả sử bạn có 1 website mà người dùng muốn đăng kí account, có thể là một trang web thương mại điện tử hay là một trang mạng xã hội. Người dùng đang ở trang đăng kí , đã điền đầy tđủ thông tin và nhấn gửi form đăng kí. Và flow xử lý thường thấy sẽ như bên dưới

// store user's data into the database
// send a welcome email to the user
// return "thank you" page

Đầu tiên ta sẽ save xuống DB -> gửi email  -> redirect user sang trang Thank you

Vì PHP code thực hiện từ trên xuống dưới nên người dùng chỉ có thể thấy “Thank you” page sau khi email đã được gửi đi. Vấn đề là **tại sao chúng ta phải bắt người dùng đợi nhỉ ***

Giải pháp sử dụng Hàng Đợi


Định nghĩa:Hàng đợi như tên gọi chỉ là một hàng mà ở đó sự vật / sự kiện sẽ được giải quyết theo thứ tự nào đó. Cụ thể hơn ở đây sự vật / sự kiện chờ được giải quyết chính là các Jobs. Nói một cách khác chúng ta sẽ sắp xếp (hay gọi là đẩy cho nó trực quan) các Jobs (sự vật / sự kiện cần được giải quyết) vào hàng đợi, và ngay lập tức (hoặc sau một khoảng thời gian delay) nào đó thực hiện các Jobs đó.

Vậy thì câu hỏi ở đây : Chúng ta sẽ đẩy Jobs vào hàng đợi như thế nào ?. Laravel có hỗ trợ chúng ta một interface Illuminate\Contracts\Queue\ShouldQueue

Khi muốn đẩy Job vào trong hàng đợi, Job phải implement interface ở trên.

Vậy thì đơn giản ở bước gửi mail, thay vì chúng ta thực hiện việc gửi mail ngay lập tức, ta sẽ đẩy job vào hàng đợi như đoạn code bên dưới

// store user's data into the database
$this->dispatch(new SendEmailJob($user)); //Push this job to queue.
// return "thank you" page immediately

Vậy là thay vì phải đợi xử lý gửi email hoàn tất, người dùng mới có thể thấy được trang thank you đẹp đẽ mà ta đã dày công chuẩn bị cho họ, ngay lập tức đập vào mặt người dùng thơ ngây của chúng ta là một trang “thank you” thấm đẫm tình cảm. Nhưng chuyện đâu có đơn giản thế ? Vậy thì lúc nào Email mới được gửi đi vậy nhỉ ?

Thực hiện Jobs


Tưởng tượng khi bạn đến Ủy ban nhân dân để công chứng giấy tờ, bạn sẽ được nhận một phiếu có số thứ tự của bạn trong hàng đợi, cô thơ ký sẽ giải quyết từng người từng người một theo qui trình bên dưới

  1. Kiểm tra nếu hàng đợi vẫn còn người, cô thơ kí cho gọi người đó vào
  2. Giải quyết công việc của người đó
  3. Kiểm tra hàng đợi tiếp, nếu hết người, cô đi ngủ hoặc ngồi tám với chú bảy kế bên

Cũng như vậy trong Laravel có sẵn một cô thơ kí để giải quyết công việc cho chúng ta, cô ấy tên là Queue Listener. Queue Listener chính là 1 long-running process (không phải cron jobs) sẽ nhận nhiệm vụ kiểm tra nếu có tác vụ tồn đọng trong hàng đợi và thực hiện tác vụ đó. Nếu không có cô thơ kí Queue Listener thì sẽ không có tác vụ nào được thực hiện, và nếu lỡ cô ấy có xì trét tới mức sập thì kể từ đó cũng ko có tác vụ nào trong hàng đợi được thực hiện cả (vậy nên nhớ kiểm tra cô ấy thường xuyên nhé :D)

Để tạo một cô thơ kí trong laravel chỉ cần thực hiện câu lệnh sau (terminal hoặc cmd)

php artisan queue:listen

Ngoài ra chúng ta có thể chỉ định cô thơ kí này nhận nhiệm vụ đối với hàng đợi nào, bên hàng đợi công chứng giấy tờ hay là bên hàng đợi bảo hiểm xã hội đều được.

php artisan queue:listen --queue=queue_name

 Bản thân hàng đợi là ?


Vậy hàng đợi chính xác là gì ? Để trả lời câu hỏi này, ta sẽ trả lời câu hỏi : Chính xác thì mấy cái Jobs khi được dispatch sẽ đi đâu ? Bản thân hàng đợi sẽ là vô nghĩa nếu nó ko có bất kì một Jobs nào trong đó. Vậy đơn giản Hàng đợi là một Danh sách các Jobs cần được thực hiện. OK. Khi hiểu được điều này chúng ta sẽ đến bước tiếp theo -> Chúng ta lưu mấy cái Jobs này ở đâu ? Bằng cách nào mà chúng ta có thể đẩy Jobs vào danh sách ? Chúng ta dùng Queue Drivers

Queue Drivers là ?


Một queue driver là một implementation của interface Illuminatte\Contracts\Queue\Queue . Queue driver nhận nhiệm vụ quản lý các Jobs trong danh sách của nó. Bao gồm lưu trữ – nhận Jobs vào hàng đợi.

Có vài drivers được tạo sẵn với laravel ( database, beanstalkd, redis, amazon sqs … ) nếu không thích chúng ta có thể tự tạo Queue driver cho riêng mình (cái này hôm nào mình sẽ đi sâu)

ví dụ: tôi có thể dùng Amazon SQS để lưu trữ các Jobs của mình tôi có thể làm 3 steps đơn giản sau :

  • cấu hình file config/queue.php với thông tin sqs
  • đặt biến môi trường QUEUE_DRIVER thành sqs

Vậy là xong, sau này khi Job được dispatch thì ở bên Amazon Console tôi có thể thấy thông tin như bên dưới

amazon_sqs
và nếu ta chạy câu lệnh listen như đề cập bên trên thì sau khi thực hiện Jobs, số message ở trong SQS sẽ tự động giảm xuống.

Ồ hay quá nhỉ ? Thế nhưng AWS SQS là dịch vụ tốn phí , tôi muốn tự mở lối tiên phong có được không ? được chứ, miễn là bạn sẽ implement queue của bạn dựa trên interface Illuminate\Contracts\Queue\Queue

Giải thích về Listener và Workers


Bây giờ chúng ta đào sâu tìm hiểu tại sao cô thơ kí có thể phát hiện được có người trong hàng đợi và cô ấy thực hiện Jobs như thế nào. Khi chúng ta chạy câu lệnh queue:listen thì Illuminate\Quee\Listener::listen() sẽ được triggered. Và nội dung xử lý như đoạn code bên dưới

 public function listen($connection, $queue, $delay, $memory, $timeout = 60)
 {
      $process = $this->makeProcess($connection, $queue, $delay, $memory, $timeout);

       while (true) {
         this->runProcess($process, $memory);
       }
 }

Chính xác thì biến $process sẽ là workers để thực hiện Job của chúng ta. Worker process chính là một Symfony Process Object mà sẽ gọi queue:work một lần khi mà nó bắt đầu

Vòng lặp while (true) cơ bản là nó sẽ chạy mãi mãi, cho đến khi bạn bắt cô thơ kí dừng hoặc là cô ấy bị stress thì cô ấy dừng lại. Chi tiết hơn về cách cô thơ kí thực hiện Jobs như bên dưới

 public function runProcess(Process $process, $memory)
 {
       $process->run(function ($type, $line) {
       $this->handleWorkerOutput($type, $line);
        });

         // Once we have run the job we'll go check if the memory limit has been
         // exceeded for the script. If it has, we will kill this script so a
         // process manager will restart this with a clean slate of memory.
         if ($this->memoryExceeded($memory)) {
           $this->stop();
         }    
 }

Như dự đoán, cô ấy sẽ dừng lại khi memory đầy (lúc đó cô ấy đi ngủ hay như thế nào thì chưa biết :D). Chúng ta có thể set giới hạn cho bộ nhớ của cô thơ kí bằng tham số --memory trong câu lệnh artisan ở trên. Tuy nhiên về mặc định của cổ là 128 MB. Nãy giờ có lẽ hơi rối nên chúng ta sẽ túm cái váy lại như sau

Khi chúng ta chạy câu lệnh queue:listen các công việc bên dưới sẽ được thực hiện tuần tự

  1. Listener::listen() method sẽ được kích hoạt, nó sẽ tạo ra 1 Symfony Process gọi đến queue:work và chứa nó trong biến $process
  2. runProcess($process) được gọi trong vòng lặp vô hạn (lần đầu tiên)
  3. run() method được kích hoạt trong Process (process sẽ starts câu lệnh queue:work tại thời điểm này)
  4. queue:work sẽ chạy câu đoạn xử lý trong Worker::pop() , nó sẽ chạy Job tiếp theo nằm trong hàng đợi (nếu có) hoặc sleeps nếu không có
  5. Nếu Job đã được hoàn tất và không còn Jobs nào tồn đọng, chương trình sẽ check Llisstener class đã tiêu thụ quá số memory nó được cung cấp chưa, nếu đã quá thì tự động Listener sẽ bị dừng lại và kết thúc
  6. runProcess($process) thực hiện tiếp tục (lần 2)
  7. Tiếp tục như thế

Bên dưới là bước số 4 (trong lớp Worker)

 public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
 {
 try {
 $connection = $this->manager->connection($connectionName);

 $job = $this->getNextJob($connection, $queue);

 // If we're able to pull a job off of the stack, we will process it and
 // then immediately return back out. If there is no job on the queue
 // we will "sleep" the worker for the specified number of seconds.
 if (! is_null($job)) {
 return $this->process(
 $this->manager->getName($connectionName), $job, $maxTries, $delay
 );
 }
 } catch (Exception $e) {
 if ($this->exceptions) {
 $this->exceptions->report($e);
 }
 }

 $this->sleep($sleep);

 return ['job' => null, 'failed' => false];
 }

Và chúng ta có thể thấy cô thơ ký siêng năng tới mức nào, cô ấy chỉ mong hết việc và được đi ngủ 🙁

Phần tiếp theo chúng ta sẽ bàn về

  • Các phương án listen
  • Đối ứng trường hợp failed job, succeed jobs
  • Tutorials với Amazon SQS