NestJS と BullMQ を使ってジョブキューを実装してみる

Pocket

NestJS と BullMQ を使ってジョブキューを実装してみる

こんにちは、mkrydik です。

今回は、NestJS と BullMQ を組み合わせてジョブキューを実装してみます。

NestJS とは

NestJS は、Node.js 製のアプリケーションフレームワークです。同様のものに Express.js があったり、他言語だと Flask (Python) や Sinatra (Ruby) みたいなものと思ってもらえれば良いです。

NestJS の特徴は、Angular に似たモジュール構成が取れる点と、Angular 同様に DI が実現できる点です。フロントエンドを Angular で作成している場合、特に同じ書き味でバックエンドを実装できてとても開発体験が良いです。

ジョブキューと BullMQ

ジョブキューは、あるタスクを複数のサーバで分散処理したり、タスクをリトライしたりできる仕組みです。

アプリケーションサーバがリクエストに対してレスポンスを返すのとは別に、例えばメール送信などのような非同期処理や、時間のかかる処理を別個に任せられるというわけです。

BullMQ は Node.js 製のジョブキューライブラリです。旧バージョンに「Bull」という無印のものもありますが、今回はバージョンアップ版である BullMQ を用います。

Redis がジョブを仲介する

BullMQ はバックエンドに Redis を使用します。Redis はインメモリ DB と呼ばれるもので、データベースの種類としては NoSQL に分類されます。

BullMQ の Queue が Redis にジョブ情報を登録し、Worker が Redis からジョブ情報を取得して処理を行う、といった形で、Redis をハブとしてジョブが振り分けられる構成となります。

実際に実装してみる

それでは実際に実装してみます。

まずは Redis をマシンにインストールします。今回は macOS と WSL2 Ubuntu で検証しましたが、ともに Homebrew で Redis をインストールしました。

$ brew install redis

$ redis-cli --version
redis-cli 7.0.12
$ redis-server --version
Redis server v=7.0.12 sha=00000000:0 malloc=jemalloc-5.2.1 bits=64 build=cda19910c5c42a3c

# 次のコマンドで localhost:6379 に Redis サーバが起動する
$ redis-server

Redis は起動したままにしておいて、次に NestJS プロジェクトを作ります。

# NestJS CLI を用意する
$ npm install -g @nestjs/cli
$ nest --version
10.1.7

# NestJS プロジェクトを作成する
$ nest new practice-nestjs-bullmq
$ cd ./practice-nestjs-bullmq/

# NestJS で BullMQ を使えるようにするライブラリと BullMQ 本体をインストールする
$ npm install --save @nestjs/bullmq bullmq

# Worker を担うクラスを生成する
$ nest g class my-processor --flat

これで下準備は完了です。

BullModule の定義

まずは app.module.ts を次のように更新します。NestJS ではおなじみの、Module Import です。コレにより Redis との接続と、キューの定義が完了しました。

  • app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';

import { AppController } from './app.controller';
import { MyProcessor } from './my-processor';
import { AppService } from './app.service';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {  // Redis との接続
        host: 'localhost',
        port: 6379
      }
    }),
    BullModule.registerQueue({
      name: 'myQueue'  // キューの名前を任意に決めて登録する
    })
  ],
  controllers: [AppController],
  providers: [
    MyProcessor,  // Worker となるクラスを Providers に登録する
    AppService
  ]
})
export class AppModule { }

キューの実装

次に、ジョブを管理する Queue を作成します。今回は app.service.ts に実装して、最終的に http://localhost:3000/ (NestJS のサーバ) にアクセスした時にジョブが追加されるような仕組みにしてみます。

  • app.service.ts
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';

@Injectable()
export class AppService {
  // `BullModule.registerQueue()` で定義したキューと同じ名前をデコレータで注入する
  constructor(@InjectQueue('myQueue') private messageQueue: Queue) { }

  public async addJob(id: number): Promise<string> {
    const job = await this.messageQueue.add('myJob', { id });  // ジョブを追加する・第2引数が渡したいデータ
    console.log('Job Added', id, job.name, job.queueName);
    return `Job Added ${id}`;
  }
}

リクエストを処理するコントローラには、ジョブに渡すデータとなる id (ここではただの連番) を定義してあげます。

  • app.controller.ts
import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  private id = 0;  // 適当なデータを用意する

  constructor(private readonly appService: AppService) {}

  @Get()
  public addJob(): Promise<string> {
    this.id++;
    return this.appService.addJob(this.id);  // Service クラスを呼び出す
  }
}

ここまでで、$ npm run start:dev などで NestJS サーバを起動し、ブラウザで http://localhost:3000/ にアクセスすると、

  • Job Added 1

といったレスポンスが届き、コンソールには

  • Job Added 1 myJob myQueue

といったログが出力されているかと思います。キューを作って Redis に登録するまでがこれで完了しました。

実際にタスクを行う Worker を作成する

最後に、追加されたキューに基づいて実際のタスクを実行する、Worker 部分を作成します。

  • my-processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Job } from 'bullmq';

@Injectable()  // Providers として解決されるように `@Injectable()` デコレータを追加する
@Processor('myQueue')  // `BullModule.registerQueue()` で定義したキューと同じ名前をデコレータで注入する
export class MyProcessor extends WorkerHost {  // WorkerHost を継承する
  // 継承した WorkerHost に基づいて、`process()` メソッドにやりたいことを実装していく
  public async process(job: Job<any, any, string>): Promise<any> {
    console.log('Process Job', job.name, job.data);  // 渡されたデータは `job.data` で参照可能
    // 実際はここに、時間のかかるタスクなどを実装します
  }
}

ここまで実装したら、再び NestJS サーバを立てて、ブラウザで http://localhost:3000/ に数回アクセスしてみます。すると、コンソールに次のようなログが出力されるかと思います。

Job Added 1 myJob myQueue
Process Job myJob { id: 1 }

Job Added 2 myJob myQueue
Process Job myJob { id: 2 }

ジョブを追加した直後の Job Added ログと、Worker が起動して出力した Process Job ログが確認できます。

ついでに、$ redis-cli で Redis に接続し、> keys * で登録されているキーを確認してみると、BullMQ が作成したデータが入っていることが確認できるでしょう。

以上

というわけで、今回は簡単ですが、NestJS と BullMQ・Redis を使用して、ジョブキューを実現してみました。今回実装したソースコードの全量は以下の GitHub リポジトリに配置しましたので、ご参考にしていただければと思います。

今回の例では Queue と Worker がプロセス単位では分離されていないので効果が分かりにくいかもしれませんが、BullMQ によってジョブのリトライ等も制御できるので、重たい非同期処理を分離したい時に有用でしょう。

NestJS は個人的に気に入っているフレームワークなのですが、いまいち話題を聞かず、BullMQ との組み合わせの文献も少なかったので、今回記事にしてみました。これが皆様の参考になれば幸いです。

Pocket

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です