Fork me on GitHub

SalesforceClient Sample

SObjectSync

概要

任意のSalesforceオブジェクトからのSELECT結果をRDBのテーブルに取り込むことができます。

テーブル上に指定のキーに対応するレコードが既にある場合はUPDATE、ない場合はINSERTという動作になります。

この機能はSQLSyncと対称的なものなのて関連クラスをbulkパッケージで作成していますが、実際にはBulkAPIは使用していません。
これはBulkAPIのクエリが参照項目(「SELECT Account.Name FROM Contact」という書式)をサポートしていないためです。

将来的に

  • 大量データの高速な移行が必要
  • 参照項目に対するクエリは不要(またはBulkAPIで参照項目がサポートされる)

という要件が発生した場合には内部的にBulkを使用するバージョンを作成するかもしれませんが、今のところその予定はありません。

動作確認はMySQL5.1とPostgreSQL9.2でのみ行っていますが、BatchUpdateをサポートするJDBCドライバであればおそらく何でも動きます。
MySQLでは「INSERT ON DUPLICATE KEY UPDATE」構文を使用するので他のDBMSよりも高速(なはず)です。
(SQL2003のMERGE構文には対応していません。)

基本的な使い方

基本的な使い方はSalesforceオブジェクトのフィールドとRDBテーブルのカラムをマッピングするだけです。

参照項目を使用する場合は「CreatedBy.Name」のようにドット区切りでフィールド名を指定します。

SalesforceClient client;
java.sql.Connection con;//移行対象のRDBのコネクション

SObjectSyncRequest request = new SObjectSyncRequest(con, "TestObject__c", "TestTable");
request.addFieldMapping("Id", "ID");
request.addFieldMapping("Name", "NAME");
request.addFieldMapping("Text__c","TEXT__C");
request.addFieldMapping("Int__c", "INT__C");
request.addFieldMapping("DateTime__c", "DATETIME__C");
request.addFieldMapping("CreatedBy.Name", "CREATE_USER");

//テーブルのキー項目(複数カラムが設定可能です。)
request.setKeyColumns("ID");

//移行処理の実行ポリシーの設定(後述)
request.setPolicy(SObjectSyncRequest.SObjectSyncPolicy.IgnoreRecordError);

//Salesforceへのクエリで一度に取得できるレコード数
//値を小さくすると何度もqueryMoreが実行されることになるのでパフォーマンス的には不利ですが
//メモリ使用量は小さくなります。
request.setBatchSize(200);

//移行処理の実行
//実行は別スレッドで非同期に行われるためresultが返ってきたタイミングではまだ処理は完了していません。
SObjectSyncResult result = client.syncSObject(request);

//実行の終了を待って結果を取得するにはSObjectSyncResult#getメソッドを実行します。
//(SObjectSyncResultはjava.util.concurrent.Futureインターフェースを実装しています。)
SObjectSyncInfo info = result.get();
System.out.println("Updated records: " + info.getSuccessCount());

上記のコードではTestObject__cの全レコードがTestTableに移行されます。

SELECT条件の指定

SalesforceオブジェクトのSELECT条件を指定する場合はSObjectSyncRequest#setWhereメソッドで自由にWHERE句の内容を記述できます。

WHERE句の中でパラメータのプレースホルダとして「?」を使用することも可能です。

SObjectSyncRequest request;
...

//最終更新日が10月1日以降で作成者がtest@flect.co.jpのレコードのみ対象
request.setWhere("LastModifiedDate > ? AND CreatedBy.Username = ?");
request.setParams(new SimpleDateFormat("yyyy-MM-dd").parse("2013-10-01"), "test@flect.co.jp");

複数フィールドからの計算結果をマッピングする

Salesforceオブジェクトのレコードに対して何らかの計算を行った結果をテーブルに格納することもできます。

その場合計算を行うクラスをSObjectSyncRequest#addFunctionMappingで登録します。

SObjectSyncRequest request;
...

//姓と名を連結してNAME列に格納する場合

//FirstName__cとLastName__cには移行対象のカラムが無いが、
//SELECT句には含めなければならないのでnullをマッピングする
request.addFieldMapping("FirstName__c", null);
request.addFieldMapping("LastName__c", null);

//NAME列に対して関数を登録
//SObjectSyncRequest.Functionは
//
//  Object evaluate(SObject obj)
//
//の1メソッドのみを持つインターフェースです。
request.addFunctionMapping("NAME", new SObjectSyncRequest.Function() {
    @Override
    public Object evaluate(SObject obj) {
        String fn = obj.getString("FirstName__c");
        String ln = obj.getString("LastName__c");
        return ln + fn;
    }
});

//オブジェクトの内容とは無関係なデフォルト値を登録することもできます。
//DefaultValueはコンストラクタ引数の値を返すだけのFunction実装です。
request.addFunctionMapping("DEL_FLG", new DefaultValue(0));

処理の実行ポリシー

RDBに対するコミットをどのタイミングで実行するか、また処理中のエラーをどのように扱うかをSObjectSyncRequest#setPolicyメソッドで設定できます。

ポリシーには以下の3種類があります。

CommitOnce

全レコードを1度にコミットする。移行処理はすべて成功するかすべて失敗するかのいずれかになります。

CommitPerQuery

大量データをSELECTする場合クエリは複数回のqueryMoreに分割されますが、そのクエリ毎にコミットされます。
例えば1000件のデータに対してBatchSize=200でクエリを実行した場合、結果セットは5回にわけて返ってきます。
その3回目の結果セットのなかにエラーとなるレコードが含まれていた場合、400件がコミットされてそこで中断(SQLExceptionをthrow)します。

IgnoreRecordError

個別のレコードエラーを無視します。
例えば文字列長の超過などRDBへのINSERT/UPDATEがエラーとなった場合でも処理を中断せずに続行します。
コミットの単位は原則クエリ単位ですが、エラーレコードがある場合は複数回に分けてコミットされます。

エラーレコードを個別にハンドルするためにはSObjectSyncronizerListenerを利用します。

SObjectSyncrhronizerListener

SObjectSynchronizerListenerは同期処理の実行中にいくつかのタイミングで発生するイベントをハンドルするためのListenerです。

どのようなタイミングでイベントが発生するかについてはSObjectSynchronizerEvent.EventTypeを参照してください。

開発者にとって最も重要なのはRECORD_ERRORイベントです。

これは実行ポリシーがIgnoreRecordErrorの場合にのみ発生するイベントで、個別のエラーレコード毎に発生します。

このイベントを利用してエラー行に対して何らかのアラートをあげることができます。

/** 各イベントまでの経過時間と発生したエラーを記録するListener */
class MyListener implements SObjectSynchronizerListener {
    
    private long startTime;
    
    public void handleEvent(SObjectSynchronizerEvent e) {
        long t = System.currentTimeMillis();
        
        //STARTEDイベントで処理の開始時間を記録する
        if (e.getType() == EventType.STARTED) {
            this.startTime = t;
        }
        
        //メッセージの作成
        StringBuilder buf = new StringBuilder();
        buf.append("type=").append(e.getType());
        
        //RECORD_ERRORの場合はErrorObjectが設定される
        if (e.getType() == EventType.RECORD_ERROR) {
            buf.append("\terrorObj=").append(e.getErrorObject().getId());
        }
        //SELECTEDの場合はQueryResultが設定される
        if (e.getType() == EventType.SELECTED) {
            buf.append("\tqueryResult=").append(e.getQueryResult().getCurrentSize());
        }
        //各種エラーイベントではExceptionが設定される
        if (e.getException() != null) {
            buf.append("\texception=").append(e.getException());
        }
        
        //このイベントまでの経過時間を出力
        buf.append("\ttime=").append(t - this.startTime).append("ms");
        System.out.println(buf);
    }
}