graphql-batchを用いたバッチ処理の仕組みについて調べてみたので、まとめておきたいと思います。
graphql-batchの仕組み
公式のexamplesにあるRecordLoaderを使って、実際のバッチ処理を例にして処理流れや仕組みをみていきます。
例として、記事一覧の記事ごとに投稿したユーザーを一緒に取得する場合を想定します。
カスタムローダーの準備
モデルから特定のattribute(カラム)と任意の条件の組み合わせでレコードを取得する際のバッチ処理を扱うもので、belongs_toな関連を持つレコードの取得時に実装されることが多いです。
record_loader.rb
class RecordLoader < GraphQL::Batch::Loader
def initialize(model, column: model.primary_key, where: nil)
super()
@model = model
@column = column.to_s
@column_type = model.type_for_attribute(@column)
@where = where
end
def load(key)
super(@column_type.cast(key))
end
def perform(keys)
query(keys).each { |record| fulfill(record.public_send(@column), record) }
keys.each { |key| fulfill(key, nil) unless fulfilled?(key) }
end
private
def query(keys)
scope = @model
scope = scope.where(@where) if @where
scope.where(@column => keys)
end
end
fieldの定義
:postsはTypes::Postのオブジェクトを複数取得するfieldで、Types::Postのオブジェクトは記事の投稿ユーザーを表すuser fieldを持っています。
user fieldでは、object.user_idをkeyとして、RecordLoaderのforメソッド、loaderメソッドを呼び出しています。
class Types::User < GraphQL::Schema::Object
field :name, String, null: false
end
class Types::Post < GraphQL::Schema::Object
field :user, Types::User, null: false
def user
RecordLoader.for(User, :id).load(object.user_id)
end
end
field :posts, [Types::Post], null: true
def posts(id:)
Post.all
end
RecordLoaderを使用せずにクエリを実行すると、user fieldが記事の取得毎によびだされるため、N+1クエリが発生するからです。
N+1クエリが発生する例
class Types::Post < GraphQL::Schema::Object
field :user, Types::User, null: false
def user
User.find(object.user_id)
end
end
上記のようなN+1クエリを防ぐために、RecordLoaderがどのようにバッチ処理を行っているのかを追っていきます。
Loaderの呼び出し
まず、user fieldでは、RecordLoader.for(User, :id)が呼び出されます。
forメソッドには、RecordLoaderクラスのinitializerで定義した引数を渡します。
内部的には、fieldから渡された引数を元にRecordLoaderクラスのインスタンスを作成し、executorにRecordLoaderクラスのオブジェクトを登録しています。
executorには、forで指定した引数とオブジェクト自身(RecordLoader)の組み合わせをキー(loader_key)としてローダーオブジェクトが登録されます。
lib/graphql/batch/loader.rb
...
else
def self.for(*group_args)
current_executor.loader(loader_key_for(*group_args)) { new(*group_args) }
end
end
...
def self.loader_key_for(*group_args, **group_kwargs)
[self, group_kwargs, group_args]
end
すでに同じキーでローダーオブジェクトが登録されている場合は、既存のローダーオブジェクトを使い回します。
forメソッドにより、RecordLoaderオブジェクトの作成もしくは取得に成功すると、load(object.user_id)でバッチ処理の対象となるモデルのカラムの値をLoaderに渡します。
内部的には、loadメソッドは親クラス(GraphQL::Batch::Loader)のloadメソッドを呼び出しています。
RecordLoaderクラスでは、loadメソッドに渡す引数はクエリの実行時にwhereメソッドの引数となるため、フィルター対象のカラムに合わせた型にキャストしています。
def load(key)
super(@column_type.cast(key))
end
superで親クラスのloadメソッドを呼びだし、Promiseオブジェクトの作成を行い、キューおよびLoaderのキャッシュストレージ(インメモリ)に保存します。
def load(key)
cache[cache_key(key)] ||= begin
queue << key
::Promise.new.tap { |promise| promise.source = self }
end
end
このタイミングでuser fieldではPromiseのオブジェクトがセットされている状態になります。
RecordLoaderの観点からは、user fieldに対して、複数のPromiseオブジェクトを持つ1つのRecordLoaderオブジェクトという形になっています。
Promiseオブジェクトのlazy_resolve
ここから、取得対象のPostの数だけuser fieldが呼び出されることにより、Promiseオブジェクトが作成された後の処理を見ていきたいと思います。
resolverがresolveされた後、lazy_resolve(::Promise, :sync)メソッドにより、作成されたPromiseオブジェクトのsyncメソッドが呼び出されます。
lazy_resolveは、graphql-rubyで用意されている遅延実行用の仕組みです。
lazy_resolve(::Promise, :sync)メソッドは、graphql-batchのセットアップ手順にあるuse GraphQL::Batch
で定義されることになります。
どのようにlazy_resolveが呼び出されているかは、lib/graphql/batch.rbのself.useメソッドを見てみるとよいです。
xx_schema.rb
class MySchema < GraphQL::Schema
query MyQueryType
mutation MyMutationType
use GraphQL::Batch
end
Promiseオブジェクトのsyncメソッドが呼び出されると、Promiseのsourceとして登録されているLoaderクラスのwait(sync)が実行されます。
waitメソッドはloaderのresolveメソッドを実行します。
resolveメソッドはLoaderクラスの継承クラス(RecordLoader)で実装されたperformメソッドを実行します。その際引数として、load_keysとしてキューに保存されていたkeyの一覧が渡されます。
Loaderオブジェクトはすでに呼び出されたPromiseオブジェクトの実行結果を状態として保持しているため、resolved?でperformでロジックの実行が必要がないPromiseオブジェクトの場合はそのまま処理を終了させます。(最初の実行ですでに解決されているため)
def resolve
return if resolved?
load_keys = queue
@queue = nil
around_perform do
perform(load_keys)
end
check_for_broken_promises(load_keys)
rescue => err
reject_pending_promises(load_keys, err)
end
Loaderのperformメソッドでは、受け取ったkeysを元にバッチ処理を実行し、keyに対応するキャッシュ中のPromiseオブジェクトをfulfill(成功したものとして完了)させます。
fulfillする際には、後続の処理(fieldへのマッピングやthenのコールバック)で必要となる値、RecordLoaderの場合は取得したレコードのカラムをkey(loadでkeyにした値)に、実際のレコードをvalueとして引数に渡します。
RecordLoaderでは指定したkeyに対応するレコードが存在しない場合もあるため、keys.each { |key| fulfill(key, nil) unless fulfilled?(key) }で漏れた分のkeyに対応するPromiseを解決しています。
record_loader.rb
...
def perform(keys)
query(keys).each { |record| fulfill(record.public_send(@column), record) }
keys.each { |key| fulfill(key, nil) unless fulfilled?(key) }
end
private
def query(keys)
scope = @model
scope = scope.where(@where) if @where
scope.where(@column => keys)
end
...
ここでバッチ処理が完了し、user fieldへバッチ処理を経て取得した値がマッピングされることになります。
Promiseを使うメリット
ここまでgraphql-batchの処理を追ってきましたが、その中でPromiseオブジェクトがよく登場してきました。
lazy objectを作成してlazy_resolveを使用するという、graphql-rubyで用意されている機能を使うだけでもバッチ処理は可能ですが、graphql-batchはバッチ処理時の
- .thenでLoaderの実行結果を使用したデータの加工ができること
- Loaderの結果に依存する別のLoaderの呼び出しができること
- Promise.allで複数のLoader(promise object)をまとめることができる
などのような後工程でのデータの変換や直列、並列なバッチ処理をシンプルに実現するための手段としてPromiseを使用しているようです。
まとめ
fieldに定義したLoaderがどのように遅延実行されバッチ処理が実現されるのかを、graphql-batchのソースコードを読みながら自分なりにまとめてみました。
自分への備忘として、最後にgraphql-batchによるバッチ処理のポイントをまとめておきたいと思います。
- 1fieldに対して、1Loader、Loaderはfield呼び出しで作成された複数のPromiseオブジェクトを保持する実装であること
- lazy_resolve実行時のPromiseオブジェクトのresolveは1Loaderにつき一度だけ実行され、同じものはスキップされる
- カスタムLoader実装時はperoformでのfulfill処理が後続処理(fieldへの値のマッピングやthenなどの後工程への値のパス)に向けて必須になる
- Promiseを使うことで、後工程でのデータの変換や直列、並列なバッチ処理をシンプルに実現するための手段としてPromiseを使用している
参考