Salesforce Pub/Sub APIを使って外部アプリから変更データキャプチャとプラットフォームイベントを購読/公開するデモを作ってみた
こんにちは遠藤です。
ゆるっとSalesforce #38では、SalesforceのEvent BusについてとEvent Busにつながる機能を整理した内容を発表させていただきました。
この記事ではイベント後半に紹介したデモについて、手元で試すためのセットアップ手順とコードの解説を中心にまとめています。
前半のEvent Busの概要については以下の記事にて解説しています。こちらの記事も参考にして下さい。
今回紹介したデモについて
イベントで紹介したデモは、codeLive「Introducing the New gRPC-based Pub Sub API | Developer Quick Takes」で紹介されていたデモを再現したものになっています。
こちらデモはTrailheadでも紹介されているebiles-lwcに変更データキャプチャとプラットフォームイベントをPub / Sub APIで処理する外部アプリ(node.js)を追加したものとなっています
デモのコードは公開されていなかったため、今回動画の解説を参考に最小限の構成で実装してみました。ebiles-lwcリポジトリを私のGithubアカウントにforkしたリポジトリhrendoh/ebikes-lwcに、追加したコードを「pubsub-example」ブランチにpushしてあります。以下で解説するデモ全体のコードはこちらのリポジトリを参考にしてください。
ちなみに、イベントで紹介したデモは上記のyoutube動画の内容とまったく同じですので、動きは自体は動画を参照ください。
デモのシステム構成
デモのシステム構成は以下のようなイメージです。
Salesforce組織に以下の変更データキャプチャとプラットフォームイベントが定義されています。
変更データキャプチャ: オブジェクト「Reseller Order(Order__c)」
プラットフォームイベント: Manufacturing Event(Manufacturing_Event__e)
また、プラットフォームイベント「Order__ChangeEvent」には以下の項目が定義されています。
Order_Id__c: オブジェクト「Reseller Order(Order__c)」のレコードのId
Status__c: オブジェクト「Reseller Order(Order__c)」の「Status(Status__c)」項目の値
システム的なイベントの発行と購読の流れは以下のようになります。
オブジェクト「Reseller Order(Order__c)」のレコードページで「Status」を「Submitted to Manufacturing」に変更し保存します。
「Reseller Order(Order__c)」変更データキャプチャイベント 「Order__ChangeEvent」が公開されます。
node.jsのExpressアプリではPub / Sub APIで、変更データキャプチャイベント「Order__ChangeEvent」を購読しています。イベントを受信して「Status」が「Submitted to Manufacturing」だった場合、Webの画面で承認対象のレコード上方を表示します。
「Approve」すると、Pub Sub / APIでプラットフォームイベント「Manufacturing_Event__e」を公開します。
レコードページに配置されているLWC「orderStatusPath」はプラットフォームイベント「Manufacturing_Event__e」を購読していて、変更を受信したらレコードを読み込み直します。
デモの動かし方
デモを自分の環境で動かしたい方は以下の手順でセットアップできますのでぜひ試してみてください。
Salesforce組織のセットアップ
まずは、ebikes-lwcをスクラッチ組織にデプロしてセットアップします。リポジトリのREADMEのセットアップ手順を参考にセットアップしてください。
① リポジトリをクローンしてpubsub-exampleブランチをチェックアウト
git clone git@github.com:hrendoh/ebikes-lwc.git
cd ebikes-lwc
git checkout pubsub-example
② Dev Hub組織にログイン
sf org login web -d -a myhuborg
③ スクラッチ組織を作成
sf org create scratch -d -f config/project-scratch-def.json -a ebikes
④ 接続アプリケーションのコンシューマ鍵を変更する
force-app/main/default/connectedApps/E_Bikes_Manufacturing.connectedApp-meta.xml を開き、consumerKeyを任意の値に変更しておきます
<consumerKey>REPLACE_YOUR_CONSUMER_KEY</consumerKey>
⑤ スクラッチ組織にソースをプッシュ
sf project deploy start
⑥ 権限セット「ebikes」と「Walkthroughs」をデフォルトユーザーに割り当て
sf org assign permset -n ebikes
sf org assign permset -n Walkthroughs
⑦ サンプルデータをインポート
sf data tree import -p ./data/sample-data-plan.json
⑧ 接続アプリケーション「E-Bikes Manufacturing」のポリシー編集で「クライアントログイン情報フロー」> 「別のユーザーとして実行」にデフォルトユーザーをセットして保存
スクラッチ組織側は以上です。
デモでは、Experience Cloudは使用しないのでサイトの公開などは割愛しています。
node.js外部アプリのセットアップと起動
次に、node.jsのアプリケーションはクライアント側はReact、サーバー側はExpressのアプリと分かれています。起動するには以下の手順でセットアップします。
まず、クライアントのReactアプリを以下の手順でビルドします。
① ターミナルでsrc/client ディレクトリを開く
cd src/client
② npmパッケージをインストール
npm install
③ Reactアプリをビルド
npm run build
続いて、Expressサーバーを以下の手順で起動します。
① ターミナルでsrc/server ディレクトリを開く
cd src/server
② npmパッケージをインストール
npm install
③ 環境変数を「.env」ファイルに設定
SALESFORCE_AUTH_TYPE=oauth-client-credentials
SALESFORCE_LOGIN_URL=https://hoge-fuge-9999-dev-ed.scratch.my.salesforce.com
SALESFORCE_CLIENT_ID=3MVG9vuHjyLKuxlGWYkGXJPJTHXgUMJrvcAfvc5FSE1RSllGQUIDaW13LOninWxwBS81.ZZWvwyCDvpZgKW6e
SALESFORCE_CLIENT_SECRET=9180F01C6B3477104F9B7DD0F2925107973767EA7B9C5903D2F382E6176B3EE5
PUB_SUB_ENDPOINT=api.pubsub.salesforce.com:7443
SALESFORCE_USER_ID=005Hy000007UXOAIA4
SALESFORCE_LOGIN_URL: ログインURLを指定します。スクラッチ組織の設定の「私のドメイン」を開き「[私のドメイン] の名」の値で<my-domain>の箇所を置き換えてください。または、「sf org display user」コマンドの出力の「Instance Url」が私のドメインです。
SALESFORCE_CLIENT_ID: 接続アプリケーションのコンシューマー鍵を指定します。
SALESFORCE_CLIENT_SECRET: 接続アプリケーションのコンシューマーの秘密を指定します。
PUB_SUB_ENDPOINT: 「api.pubsub.salesforce.com:7443」を指定します。
SALESFORCE_USER_ID: デフォルトのユーザーのレコードIDを設定します。ユーザーのIDは「sf org display user」でも確認できます。
Expressサーバーを以下のコマンドで起動します。
node index.js
http://localhost:3000をブラウザで開くと以下の空のUIが表示されます。
デモの動作確認
次にデモの動作確認をしていきます。
スクラッチ組織にログインして、E-Bikesアプリケーションを開き、「Reseller Order」タブで新しいレコードを作成します。
以下のように変更データキャプチャイベント「Order__ChangeEvent」のペイロードが表示されればOKです。
更に、ステータスパスのコンポーネント「orderStatusPath」の「Submitted to Manufacturing」をクリックします。
レコード更新の変更データキャプチャイベントを受信して、Webの画面にレコードの情報が表示されます。
「Approve」をクリックすると、プラットフォームイベント「Manufacturing_Event__e」が公開されます。レコードページに配置されているLWC「orderStatusPath」はこのイベントを購読しているので、イベントを受信するとレコードを更新します。
デモの動作確認は以上です。
コードの解説
ここから、デモのコードを解説していきます。
はじめにPub / Sub APIを利用するnode.jsのExpressアプリの解説をした後に、レコードページに配置されているLightinng Webコンポーネントについて解説します。
node.jsのExpressアプリはsrc/server ディレクトリにあります。UIのReactアプリもsrc/clientにありますが、外部アプリの画面はSalesforceに関わる内容は無いため解説は割愛します。
Node client for the Salesforce Pub/Sub APIについて
node.js用のPub / Sub APIクライアントは、codeLiveの動画で解説をしているOzilさんが作成したパッケージを使っています。
こちらは、公式のライブラリではないため利用は自己責任になってしまいます。
Node client for the Salesforce Pub/Sub APIの設定
salesforce-pubsub-api-clientを利用するための設定を見ていきます。
salesforce-pubsub-api-clientは環境変数から認証に必要な情報を読み込みます。セットアップ手順にも記載しましたが、デモのExpressアプリはnode.jsのdotenvを利用してますので「.env」ファイルで環境変数を定義することができます。「.env」ファイルについて改めて見ていきます。
SALESFORCE_AUTH_TYPE=oauth-client-credentials
SALESFORCE_LOGIN_URL=https://page-page-3162-dev-ed.scratch.my.salesforce.com
SALESFORCE_CLIENT_ID=3MVG9vuHjyLKuxlGWYkGXJPJTHXgUMJrvcAfvc5FSE1RSllGQUIDaW13LOninWxwBS81.ZZWvwyCDvpZgKW6e
SALESFORCE_CLIENT_SECRET=9180F01C6B3477104F9B7DD0F2925107973767EA7B9C5903D2F382E6176B3EE5
PUB_SUB_ENDPOINT=api.pubsub.salesforce.com:7443
SALESFORCE_AUTH_TYPE
認証方式を指定します。上記では「oauth-client-credentials」を指定していますが、他に以下の認証方式を指定可能です。
OAuth 2.0 クライアントログイン情報フロー:「oauth-client-credentials」を指定
OAuth 2.0 JWT ベアラーフロー: 「oauth-jwt-bearer」
OAuthのユーザー名/パスワードフロー: 「username-password 」
SALESFORCE_LOGIN_URL
ログインURLを指定します。ログインURLは「https://<私のドメイン>.my.salesforce.com」の形式です。私のドメインは設定でも確認できますが、「sf org display user」コマンドで確認するのが早くておすすめです。
SALESFORCE_CLIENT_ID
接続アプリケーションのOAuth「コンシューマー鍵をセットします。
SALESFORCE_CLIENT_SECRET
接続アプリケーションのOAuth「コンシューマーの秘密」をセットします。
Node client for the Salesforce Pub/Sub APIの初期化と接続
Pub / Sub APIのエンドポイントへは以下の2行で接続できます。
const client = new PubSubApiClient();
await client.connect();
※ 接続エラー時にはリトライするなど実際にはエラー処理が必要です。
変更データキャプチャイベントの購読
「Reseller Order」の変更データキャプチャイベント「Order__ChangeEvent」を購読するコードは以下のとおりです。
// 変更データキャプチャのトピック
const ORDER_CDC_TOPIC = '/data/Order__ChangeEvent';
// 「Reseller Order」の変更データキャプチャイベントを購読
const eventEmitter = await client.subscribe(ORDER_CDC_TOPIC);
// 受信したイベントを処理するコールバック
eventEmitter.on('data', (event) => {
...
// イベント種別が「UPDATE」かつレコードのステータスが「Submitted to Manufacturing」
if (
event.payload.ChangeEventHeader.changeType === 'UPDATE' &&
event.payload.Status__c === 'Submitted to Manufacturing'
) {
// クライアントにメッセージを送信
ws.send(
`{"recordIds": ${JSON.stringify(event.payload.ChangeEventHeader.recordIds)}}`
);
}
}
イベントを受信するとeventEmitter.onコールバックが呼び出されます。
変更種別が「UPDATE」かつ項目「ステータス(Status__c)」の値が「Submitted to Manufacturing」のときだけブラウザにWebSocketでメッセージを送信するようにしています。
ブラウザ側ではメッセージを受け取るとレコードの情報とともに「Approve」ボタンを表示しています。
参考: Authentication | Get Started | Get Started | Pub/Sub API | Salesforce Developers
プラットフォームイベントの公開
画面で「Approve」をクリックされると、プラットフォームイベント「Manufacturing Event(Manufacturing_Event__e)」を公開します。
if (msgObj.status === 'Approved by Manufacturing') {
const payload = {
CreatedDate: new Date().getTime(),
CreatedById: SALESFORCE_USER_ID,
Order_Id__c: { string: msgObj.recordId },
Status__c: { string: msgObj.status }
};
const publishResult = await client.publish(
MANUFACTURING_PE_TOPIC,
payload
);
console.log('Published event: ', JSON.stringify(publishResult));
}
SALESFORCE_USER_IDは.envで指定したユーザーIdです。本来はREST APIで取得すべきですが今回は割愛しました。
Lighting Webコンポーネント「orderStatusPath」
orderStatusPathは、E-Bikes LWCにもともとあるコンポーネントです。
このコンポーネントはプラットフォームイベント「Manufacturing Event(Manufacturing_Event__e)」を購読しています。
関係するコードは以下の箇所です。
// empAPIのインポート
import {
subscribe,
unsubscribe,
onError,
setDebugFlag,
isEmpEnabled
} from 'lightning/empApi';
// プラットフォームイベント定義を保持する定数
const OBJECT_API_NAME = 'Order__c';
const FIELD_API_NAME = 'Status__c';
const MANUFACTURING_EVENT_CHANNEL = '/event/Manufacturing_Event__e';
export default class OrderStatusPath extends LightningElement {
async connectedCallback() {
// EMP APIが利用可能か確認
const isEmpApiEnabled = await isEmpEnabled();
// プラットフォームイベント「Manufacturing Event」を購読
try {
this.subscription = await subscribe(
MANUFACTURING_EVENT_CHANNEL,
-1,
(event) => {
this.handleManufacturingEvent(event);
}
);
} catch (error) {
this.reportError('EMP API error: failed to subscribe', error);
}
}
disconnectedCallback() {
if (this.subscription) {
// 購読を停止
unsubscribe(this.subscription);
}
}
以下、「Manufacturing Event」を受信するとメソッド「handleManufacturingEvent」が呼び出されます。さらに、このメソッドから呼び出される「setPicklistValue」内で「ステータス(Status__c)」に新しい値をセットし「updateRecord」を呼び出しています。「updateRecord」に成功すると画面のステータスの表示も「Approved by Manufacturing」 に切り替わります。
handleManufacturingEvent(event) {
// プラットフォームイベントに指定されているレコードが現在のレコードと一致する場合のみ
// 選択リスト項目「ステータス(Status__c)」を更新する
if (event.data.payload.Order_Id__c === this.recordId) {
this.setPicklistValue(event.data.payload.Status__c);
}
}
async setPicklistValue(value) {
// 更新するレコードの用意
const fields = {
Id: this.recordId
};
fields[FIELD_API_NAME] = value;
const recordInput = { fields };
// レコードを再読み込みする
try {
await UPDATERECORD(recordInput);
this.dispatchEvent(
new ShowToastEvent({
title: 'Order Updated',
message: `Order status set to "${value}"`,
variant: 'success'
})
);
} catch (error) {
this.reportError(
`Failed to update order status to "${value}"`,
error
);
}
}
おわりに
以上、外部アプリからPub / Sub APIを利用してリアルタイムなユーザーエクスペリエンスの実装方法を紹介してきました。
実運用に乗せるには、イベントの公開に失敗した場合や受信したイベントの処理に失敗した場合にリトライする処理などエラーハンドリングを実装する必要がありますが、Pub / Sub APIを利用したイベント駆動なアプリケーションの作り方をおさえられたのではと思います。
参考情報リンク
関連リポジトリ
公式ブログ記事など