Apexで非同期ジョブのチェーン実行をサポートするフレームワークapex-chainableの使い方
こんにちは遠藤です。
ゆるっとSalesforceトーク #44ではApexの非同期ジョブをチェーン実行する方法について、Apexジョブチェーンの実装をサポートするライブラリー「apex-chainable」を中心に解説しました。
この記事は、イベントで話した内容と時間の関係で詳しく解説できなかったことを加えてまとめています。
はじめに
ApexでQueuealbeやBatchableなどをつなげて実行しなければならない場合、どのように実装していますか?
普通に実装するとQueueableであればfinalizer、Batchableであればfinishメソッドから次のジョブを起動するSystem.enqueueJobまたはDatabase.executeBatchを呼び出すことになります。しかし、処理が複雑になったり、連続実行するジョブが増えたりすると実装には細心の注意が必要です。さらに、非同期ジョブはApexテストも記述しにくいため、開発者の不安も大きなものとなります。
そんな課題を解決してくれるライブラリが、DF'24のセッション「Orchestrate Async Processes With Deferrable Apex Chainables」で紹介された「apex-chainable」です。
こちらのセッションの内容は、Salesforceの開発者向けyoutubeチャンネルSalesforce Developersの以下の動画にて公開されています。
今回は、このapex-chainableの解説を中心に、Apexで非同期ジョブをチェーンする際に気をつけなければならないポイント、apex-chainableが解決する課題、また解決できない課題について解説します。
さらに、apex-chainableをさらに発展させるフレームワークの案についても紹介します。
どんなときに非同期ジョブをチェーン実行する必要があるのか?
主なユースケースは、以下のようなものが考えられるかと思います。
処理Aの完了が処理Bに依存している
外部システムからのデータのインポートをApexで実装する必要がある
非同期ジョブの並行実行数制限5があるので時間がかかってもなるべくシリアルに実行したい
大量データの更新・削除処理が複数オブジェクトにまたがって必要となる
ガバナ制限の回避
処理Aの完了が処理Bに依存している
外部のAPIから取得したレコードをオブジェクトに保存後、更にデータを加工するために別の処理を起動するようなケースです。
実際に弊社製品のCalsketのGoogle同期機能は、Google ClanedarからGET /eventsしオブジェクトに保存するQueueableジョブに続けて、オブジェクトに保存したGoogleのイベントをSalesforceの行動に変換するBatchableジョブを複数呼び出す流れで実装されています。
外部データのインポートをApexで実装する必要がある
前述のCalsketの例もそうですが、APIから取得した外部データをオブジェクトに同期するような処理をApexで実装しなければならないケースがこれにあ当たります。API叩くクライアントなどをSalesforce外に用意することが難しい場合ですね。通常の案件では、Salesforce Connectを利用したり、データローダーやSalesforceのAPIを外から叩く構成が取りやすいのであまりやらないと思いますが、特にAppExchangeアプリの場合は、汎用性のためにそういった構成を取ることも多そうです。
また、この構成でデータ量が多くなった場合はAPIの呼び出しのページング処理が必要なためQueueableジョブを自己呼び出しでチェーンする必用がでてきます。さらに、大抵は前処理後処理が伴うため、付随する非同期ジョブを前後にチェーンする必要が出てきます。
非同期処理は組織で並行実行は5までなのでなるべくシリアルに実行したい
各非同期ジョブが独立して実行可能な場合は、一度にSystem.enqueueしてしまえば良さそうなケースもありますが、組織内で同時に実行可能な非同期ジョブ実行の5までなので、あまり他の処理に影響を与えないようにしたいケースです。
大量データの更新・削除処理が複数オブジェクトにまたがって必要となる
Batchableジョブをチェーンするケースです、startメソッドが返すべき対象となるオブジェクトが複数ある場合、かつシリアルに実行しなければならないケースなどがあたります。
ガバナ制限の回避
ガバナ制限の回避のために、非同期ジョブが必要となるケースです。セッションで紹介されている例のようにトリガーから非同期ジョブが呼び出されるケースなどが多いかもしれません。
非同期ジョブのチェーン実行における実装の課題
はじめにでも言及しましたが、非同期ジョブチェーンの実装には以下のような課題があり開発者は常に不安を抱えることになります。
コードの見通しがわるい
各処理のデータ共有がむずかしい
Apexテストが書きにくい。またはそもそも書けないパターンもある
コードの見通しが悪くなる
こちらは、apex-chainableのREADMEの「Without Chainable」にも例が載っています。
例えばBatchableを実装したクラスBatch1、Batch2、Batch3を連続で実行するように実装すると以下のようになります。
// Batch1のfinishメソッド
public void finish(Database.BatchableContext BC) {
Database.executeBatch(new Batch2(), 1000);
}
続いて
// Batch2のfinishメソッド
public void finish(Database.BatchableContext BC) {
Database.executeBatch(new Batch3(), 200);
}
3つくらいならなんとかなりそうですが、すべてのApexのfinishメソッドを確認しないとどの順番で実行されるかわからなく不安です。
各処理のデータ共有がむずかしい
多くの場合は、バッチ処理は定期実行することが多いと思います。
非同期処理が完了した場合は最後に実行結果を通知する処理が必要です。
先程の例Batch1、Batch2、Batch3がチェーン実行される場合、Batch3のfinishメソッドには通知送信が記述されるのではないでしょうか。
// Batch3のfinishメソッド
public void finish(Database.BatchableContext BC) {
Messaging.SingleEmailMessage mail = new Messaging.SingleEmailMessage();
mail.setTargetObjectId(UserInfo.getUserId());
mail.saveAsActivity = false;
mail.setSubject('バッチ処理が完了しました');
mail.setHtmlBody('実行は成功です<<Batch1、Batch2、Batch3の実行結果が含まれる>>');
Messaging.sendEmail(new List<Messaging.SingleEmailMessage>{ mail });
}
さらに、Batch1とBatch2の実行結果をBatch3で作成するメール通知の本文に含めたい場合、どう実装したら良いでしょうか?
普通に実装するとQueueBでは必要のないデータもコンストラクタなどで渡すようにして、バケツリレーすることになりそうです。。
// Batch1のfinishメソッド
public void finish(Database.BatchableContext BC) {
Database.executeBatch(new Batch2('<通知に含めるBatch1の結果>'), 1000);
}
// Batch1のfinishメソッド
public void finish(Database.BatchableContext BC) {
Database.executeBatch(new Batch3('<通知に含めるBatch1の結果>','<通知に含めるBatch2の結果>'), 200);
}
これは極端ですが、似たようは実装をしてしまいがちです。
Apexテストが書きにくい
finishで次のバッチジョブを起動するようなコードの場合、Apexテストは非常に書きにくくなります。
例えば、各ジョブでコールアウトが伴う場合、関連するApexテストクラス全て後続処理用のHttpCalloutMockやテスト用レコードを用意するコードを長々と記述する必用が出てきてしまいます。
各非同期ジョブが次のジョブを正常に呼び出せるかのテストの記述も大変です。
また、そもそも以下のパターンのApexテストは書けないのも頭が痛い問題です。
Batchableは、Apexテストで実行可能なバッチは1つのみ
Queueuableは自己呼び出しができない
futureも絡む制限もあったような
apex-chainableが解決すること
前置きがだいぶ長くなりましたが、これらの課題を解決するのが冒頭で紹介したapex-chainableというライブラリです。
まずは、先ほどの課題「コードの見通しが悪くなる」と「各処理のデータ共有がむずかしい」の解決から見ていきます。
非同期処理の実行順序を事前に指定できる
例えば、QueueableジョブQueue1、Queue2に続いてBatchableジョブBatch3を連続で実行しなければならない場合、apex-chainableを利用すると以下のように1行で実行順序を指定できます。
new Queue1()
.then(new Queue2())
.then(new Batch3())
.execute();
とても見通しが良いですね。一目で非同期ジョブの実行順を読み取ることができます。
また、ジョブの入れ替えも簡単です。
Dreamforceのセッションで紹介されていたユースケースでは、商談をクリーンナップするOppClearnerと完了通知?を送信するOppCounterNotifierをチェーンしていました。これに定期実行と手動実行で通知メッセージを変更したいという要件が追加されたとします。通常はBatchableのfinishで実行コンテキストによる場合分けをし通知内容を出し分けるように実装しますが、apex-chainableを使用すると以下のようにジョブチェーン起動時に指定することができます。
以下、定期実行に通知を送信するOppNotifierCronTrigger、即時実行に通知を送信するOppNotifierOndemandを実装し、ジョブチェーン起動時に指定しています。
定期実行時
new OppClearner()
.then(new OppNotifierCronTrigger())
.execute();
即時実行時
new OppClearner()
.then(new OppNotifierOndemand())
.execute();
SharedVariablesでバケツリレーを回避
apex-chainableには、各非同期処理でデータを共有する仕組みを提供しています。例えば、Queue1 > Queue2 > Queue3と連続して実行する場合に、Queue1の結果をQueue3で利用したい場合、普通に実装するとQueue2では参照しないのにもかかわらず逐次変数をバケツリレーする必要があります。apex-chainableでは共有データをsetSharedメソッドに渡すと後続のジョブでも利用できるようになります。
new Queue1()
.setShared('result', new Money(0))
.then(new Queue2())
.then(new Batch3())
.execute();
注意点としては、SharedVariablesに入れるインスタンスはシリアライズ可能であることです。
次に説明するexecuteDeferredを利用する場合は、さらに制限がありJSONにシリアライズ、デシリアライズできるクラスのみが使用可能となります。
ちなみに、JSON用のApexクラスを利用したい場合は、setDeferredSharedメソッドをオーバーライドしてデシリアライズするロジックを追加するか、以下のようにgetSharedしたMap<String, Object>の値をシリアライズ後、デシリアライズをし直します。
Money mon = (Money) JSON.deserialize(JSON.serialize(this.getShared('result'), Money.class);
Apexテストは各ジョブごとに記述ができる
非同期ジョブの実行シーケンスはapex-chainableの責務になります。つまり、各非同期ジョブが次のジョブを正しく起動するか?のようなApexテストは不要になります。
シーケンスが正しくセットされているかの統合テストは必要ですが、各ロジックは前後の処理に依存しない形で実装することができます。
つまり、Queue1 > Queue2 > Queue3とチェーン実行する場合であっても、Queue2のロジックのみをテストしたい場合には以下のようにQueue2のみexecuteすればOKです。
@IsTest
static void execute() {
Test.startTest();
new Queue2().execute();
Test.stopTest();
Assert.areXXX();
}
apex-chainableの各非同期ジョブの実装方法
各非同期ジョブを実装するには、Queueableジョブは、ChainableQueueable、BatchableジョブはChainableBatchはを継承するクラスとして定義します。
Queueableの実装方法
QueueableはChainableQueueを継承します。実装するメソッドはexecute(Context ctx)メソッドのみです。このexecute(Context ctx)メソッドは、親のChainableQueueが実装するQueueableインターフェースのexecute(QueueableContext ctx)から呼び出されます。引数のContextはChainableの内部クラスでQueueableContextを保持しています。コンテキストが必用な場合はキャストして使用します。
以下、System.debugを出力しているだけの実装例です。
public with sharing class Queue1 extends ChainableQueueable{
override protected void execute(Context ctx) {
System.debug('Queue1 is running.');
}
}
Batchableの実装方法
BatchableはChainableBatchableを継承します。Batchableインターフェースのstart、execute、finishメソッドに対応するContextを受けるメソッドを3つとも実装する必用があります。
public with sharing class Batch3 extends ChainableBatch {
override protected Iterable<Object> start(Context ctx) {
return [
SELECT Id FROM Opportunity
];
}
override protected void execute(Context ctx, Iterable<Object> scope) {
delete (sObject[]) scope;
}
override protected void finish(Context ctx) {}
}
バッチサイズは、デフォルト200です。変更したい場合はbatchSizeメソッドをオーバーライドします。
apex-chainableの動作の仕組み
各Chainableは、次の非同期ジョブをnext変数に保持していますが、Apex非同期ジョブは別プロセスで実行されるのに複数のチェーンが保持されているのはちょっと不思議だと思いませんか?
ここではそのあたりをQueuableの実装を中心に解説していきます。
非同期ジョブチェーンをどうやって実現しているのか?
まずapex-chainableのクラス構成ですが、Sshedulable、Batchable、Queueableを共通に扱えるように抽象化したChainableというクラスがあり、こちらのクラスに非同期ジョブをチェーンする仕組みが詰まっています。
前節の例ではQueue1 > Queue2 > Queue3と実行するようにthenメソッドを呼び出して指定していしました。以下は、thenの中身ですが、Queue1のnextにQueue2のインスタンスが、Queue2のnextにQueue3のインスタンスがというように連結リスト(LinkedList)のデータ構造で次に実行されるジョブを数珠つなぎに保持しています。
public Chainable then(Chainable successor) {
if(next != null) {
next.then(successor);
}
else {
next = successor;
next.previous = this;
next.sharedVariables = sharedVariables;
}
return this;
}
Queueableはなぜ次のジョブを保持できるのか?
Queueableはenqueue後に別プロセスで実行されるためQueue1 > Queue2 > Queue3の参照関係が保持されることが想像しにくいのですが、enqueue時にメンバー変数はシリアライズされそのまま保持されます。つまり、next > next > nextの参照先のインスタンスが保持されたままシリアライズされるので、ジョブ起動時にはインスタンスの参照関係も自動的に復元されるというわけです。
一方、Batchableについては、Database.Statefulインターフェースを指定すればメンバー変数を引き継いでくれる機能を使っています。こちらは普通ですね。
非同期処理の開始を遅延できるexecuteDeferredとは?
最後にapex-chainableが売りとしている非同期ジョブチェーンの起動を遅延させる機能について解説します。
executeではなくexecuteDeferredメソッドを呼び出して非同期ジョブチェーンを開始すると、内部ではプラットフォームイベントを登録します。それをプラットフォームトリガーフローで購読し、プラットフォームイベントから非同期処理チェーンを復元して実行します。
これが便利なケースは、セッションで紹介されているようにトランザクション内で商談と取引先の更新が発生し、かつ商談と取引先の更新からトリガーされる自動プロセスでそれぞれChainablerを起動したいケースです。
以下、処理イメージのシーケンスを描いてみました。あるトランザクションn処理があり非同期ジョブチェーンChainbale1とChainable2が、商談と取引先のApexトリガーから起動されるとします。
しかし、Chainbale1とChainable2を同時に起動してしまうと、下のトランザクションも含めてそれぞれの処理が競合してしまう恐れがあります。
そこで、遅延実行executeDeferredの出番です。商談の更新で起動されるOpptyTriggerではOppCleanerとOppCounterNotifierをexecuteDeferredで起動します。
// OpptyTriggerの処理
new OppCleaner()
.then(new OppCounterNotifier())
.executeDeferred()
取引先の更新から起動されるAccountTriggerではAddressValidatorとPhoneValidatorをexecuteDeferredで起動します。
// Chainable2
new AddressValidator()
.then(new PhoneValidator())
.executeDeferred();
executeDeferredを実行すると内部的にはChainableのインスタンスごとのプラットフォームイベント「Chainable__c」が保存されます。
上記のケースでは2レコードずつ合計4レコードのプラットフォームイベントが登録されます。
プラットフォームイベントのコミットはトランザクション完了時なので、「Chainable__c」を購読するプラットフォームトリガーフローでは4レコードの一括処理としてイベントが呼び出されます。
Apexアクションに登録されるInvocableMethodは以下のrebuildAndExecuteChainです。
引数のdeferredLinksは、アクションの入力値用の内部クラスDeferredChainLinkクラスの配列で、deferEventにプラットフォームイベント「Chainable__c」を保持しています。
@InvocableMethod(label='Rebuild chain from deferred chainables and execute')
public static List<DeferRebuildResult> rebuildAndExecuteChain(List<DeferredChainLink> deferredLinks) {
rebuildAndExecuteChainでは、プラットフォームイベントよりChainableのインスタンスを再生成してexecuteメソッドを実行します。具体的に以下の4つのチェーンをexecuteメソッドで実行した場合と同じ処理となります。
new OppCleaner()
.then(new OppCounterNotifier())
.then(new AddressValidator()
.then(new PhoneValidator())
.execute()
apex-chainableでは解決できていない課題
ここまで、apex-chainableについて一通り解説してきましたが、apex-chainableでは解決できていない課題についてまとめておきます。
apex-chainableを作った方は前述executeDeferredの例のようなトリガーからの非同期チェーンをもっとも解決したかったのではないかと想像しています。apex-chainableはとてもシンプルで便利ですが、私が関わっている自社のプロダクトや案件では、非同期ジョブチェーンを実装するにあたっては以下のようなことを考慮する必用がありました。
TransactionFinalizerを利用したい。エラーハンドリングをより堅牢にしたいため。
自己呼び出しするQueueableには対応する必要がある。APIのページング処理のため。
thenでチェーンを定義するのは実は使いにくい。システム設定によって非同期ジョブチェーンの組み換えが必要なケースなどがあるため。
SharedVariablesはシンプルだが、グローバル変数的な問題が発生しそう。
Developer Editionに対応したいときもある。引数なしのコンストラクタ問題との天秤になるが。
また、以下の機能はあまり必要になったことはありません。
遅延実行は便利だが利用に難あり
プラットフォームイベントの定義数には制限があるためAppExchangeアプリでは使いにくい
SharedVariablesにカスタムApexクラスを使う場合は、setDeferredSharedの実装する必要がありきれいに実装できないかも
引数なしコンストラクタのみでは対応できないものもある
ChainableSchedulableは利用シーンが思いつかない
Chainable簡易・発展版を考えてみる
以下のクラス図は、実際に動いている非同期ジョブチェーン実装を踏まえて理想を加えて書き起こしたものです。
こちらは、実装を整理して別の機会に紹介できればと思います。
利用イメージ
ChainableOrchestrator orchestrator = new ChainableOrchestrator();
orchestrator.add(new GoogleEventRetrieverQueue());
orchestrator.add(new GoogleEventConverterBatch());
orchestrator.add(new GoogleRecurrenceEventConverterBatch());
orchestrator.start();
解決したこと
apex-chainableで実装している以下の機能はあまり使わないので割愛
deffered
スケジューラへの対応
Queueableはfinalizerを使うようにする
Queueableの再帰呼び出しに対応
ただし、自己呼び出し時にnext、sharedVariables、errorHandlerの引き継ぎ方は要検討
thenが使いにくい問題への対処は管理クラスOrchestratorを導入
エラーハンドラーの強化
ErrorHandleをセット可能に(handleErrorメソッドを用意して子クラスでオーバーライドする方が良いケースもありそう)
エラー時にジョブチェーンを継続するのか中断するのかの判定をどうハンドリングするかについては未対応
DEへの対応基本路線は非対応SharedVariablesのシリアライズ・デシリアライズの仕組みを詰めれば可能かもしれない補足で言及していますが、最大スタック深度が設定可能になったためDEのための特別な対応は不要となりそうです
おわりに
私は、いくつかの案件でapex-chainableと似たようなしくみを作ったことがありますが、はじめに非同期ジョブチェーンを実装しようとしたきっかけは、開発中にたまたまQueueableにHttpClientのインスタンスを保持しているミスがあり、Javaのシリアライズエラーがそのまま出力されているのを見てなるほど!と思った記憶があります。このあたりは、「apex-chainableの動作の仕組み」のところで解説させていただきました。
また、実装ではデザインパターンのChain-of-responsibilityパターンをApexに適用できないか?と考えましたがApexはリフレクションが引数なしのコンストラクタしかサポートされていないので難しかったのを覚えています。セキュリティの対策でリフレクションはサポートされていないという認識ですが、一般的なデザインパターンが適用できないのは結構辛いものがありますね。まあとはいえ、その制限の中で工夫するのもまた面白いところではあります。
以上、長くなりましたがApexの非同期ジョブを実装する際に、「apex-chainableのアイディアが次の案件で使えるかも」と思っていただけたら幸いです。
補足
非同期ジョブを実装するにあたっていつも気をつけることを補足として列挙しておきます。
Tips: Queueableのメンバー変数にシリアライズできないクラスを保持したい
Queueableのメンバ変数にMessaging.SingleEmailMessageやHttpClientなどシリアライズできないクラスを保持する必要がある場合は、transient修飾子を付けてシリアライズの対象外としておけばOKです。
例えば、以下は通知用の非同期ジョブですが、Apexテストで送信するメールの内容をチェックするためのSingleEmailMessageのインスタンスをメンバー変数に保持しています。
public with sharing class MasterImporttNotificationQueue extends ChainableQueue {
@TestVisible
transient Messaging.SingleEmailMessage mail;
...
}
制限: Developer Editionのチェーン数は最大5
と思っていましたが、次のmaximumQueueableStackDepthを指定することで回避できるらしいです。
Tips: 本番組織でも最大チェーン数を設けるべきか?
Winter ’24で「チェーニングされたキュー可能ジョブの最大深度の設定 (正式リリース)」が可能となりました。
これによりプログラムのミスにより止まらないQueueableジョブというのを防げますw。
不要な制限を設けないというのはプログラミングの定石ですが、止まらない非同期ジョブチェーンは怖いので十分に大きな最大チェーン数は設けて置くのが良いかもしれないですね。デフォルトを厳し目にしておいてカスタマイズできるとより良さそうです。
で、前後しますが、リリースノートにも以下のように記載されていますね。なんとまあ。
制限: 非同期ジョブの1日実行数は25万回
これ結構と引っかかります。気をつけましょう。
制限: ApexテストではBatchableのバッチ実行数は1のみ
これは結構辛いです。。
制限: ApexテストではQueueableはチェーンに制限はある?
ApexテストでQueueableは自己呼び出しは以下のエラーが発生する