定義
db.collection.watch( pipeline, options )重要
mongosh メソッド
このページでは、
mongoshメソッドについて説明します。ただし、データベースコマンドや Node.js などの言語固有のドライバーのドキュメントには該当しません。データベースコマンドについては、
$changeStream集計ステージのaggregateコマンドを参照してください。MongoDB API ドライバーについては、各言語の MongoDB ドライバー ドキュメントを参照してください。
レプリカセットとシャーディングされたクラスターのみ
コレクション上で変更ストリーム カーソルを開きます。
Parameterタイプ説明pipeline配列
任意。次の集計ステージの1つ以上で構成される集計パイプライン:
変更イベント出力をフィルタリング/修正するためのパイプラインを指定します。
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
optionsドキュメント
optionsドキュメントには、次のフィールドと値を含めることができます。フィールドタイプ説明resumeAfterドキュメント
任意。
watch()再開トークンで指定された操作の後に再開通知を試行するように に指示します。各変更ストリームのイベント ドキュメントには、
_idフィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_idフィールド全体を渡します。resumeAfterは、startAfterおよびstartAtOperationTimeと排他関係にあります。startAfterドキュメント
任意。
watch()再開トークンで指定された操作の後に新しい変更ストリームの開始を試行するように に指示します。無効化イベント後に通知を再開できるようにします。各変更ストリームのイベント ドキュメントには、
_idフィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_idフィールド全体を渡します。startAfterは、resumeAfterおよびstartAtOperationTimeと排他関係にあります。fullDocumentstring
任意。 デフォルトでは 、アップデートされたドキュメント全体ではなく、
watch()アップデート操作によって変更されたフィールドのデルタが返されます。fullDocumentを"updateLookup"watch()に設定すると、更新されたドキュメントの過半数がコミットした最新のバージョンを参照するよう に指示します。watch()fullDocumentは、updateDescriptionデルタに加えて、ドキュメント検索を含む フィールドを返します。MongoDB 6.0 以降では、
fullDocumentを次のように設定できます。"whenAvailable"これは、ドキュメントの挿入、置換、または更新後に、利用可能な場合、ドキュメントの変更後のイメージを出力するためです。"required"ドキュメントの挿入、置換、またはアップデート後に、ドキュメントの変更後のイメージを出力します。変更後のイメージが利用できない場合、エラーが発生します。
fullDocumentBeforeChangestring
任意。
MongoDB 6.0 以降では、新しい
fullDocumentBeforeChangeフィールドを使用して次のように設定できます。"whenAvailable"ドキュメントの置換、アップデート、または削除前に、利用可能な場合、ドキュメントの変更前のイメージを出力します。"required"これは、ドキュメントが置換、更新、削除される前に、ドキュメントの変更前のイメージを出力するためです。変更前のイメージが利用できない場合、エラーが発生します。"off"変更前のドキュメントのイメージを非表示にします。デフォルトは"off"です。
batchSize整数
任意。変更ストリームの各バッチで返されるドキュメントの最大数。デフォルトでは、
watch()の初期バッチサイズは、101件のドキュメントまたは 16 メビバイト(MiB)相当のドキュメントのうち小さい方です。後続のバッチの最大サイズは 16 メビバイトです。このオプションでは 16 メビバイトより小さい制限を強制できますが、それより大きい制限を強制することはできません。設定されている場合、batchSizeはbatchSizeドキュメントまたは 16 MiB 相当のドキュメントのうち小さい方になります。cursor.batchSize()と同じ機能を持ちます。maxAwaitTimeMS整数
任意。空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。
デフォルトは
1000ミリ秒です。collationドキュメント
任意。照合ドキュメントを渡し、変更ストリーム カーソルの照合順序を指定します。
省略する場合は
simpleバイナリ比較がデフォルトです。showExpandedEventsブール値
任意。MongoDB 6.0 以降、変更ストリームは、 createIndexes イベントや dropIndexes イベントなどの DDL イベントの変更通知をサポートします。展開されたイベントを変更ストリームに含めるには、
showExpandedEventsオプションを使用して変更ストリーム カーソルを作成します。バージョン 6.0 で追加。
startAtOperationTimeタイムスタンプ
任意。変更ストリームの開始点。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。oplog の時間範囲を確認するには、
rs.printReplicationInfo()を参照してください。startAtOperationTimeは、resumeAfterおよびstartAfterと排他関係にあります。次の値を返します。 MongoDB 配置への接続が開いていて、コレクションが存在する限り開いたままのカーソル。変更イベント ドキュメントの例については、「変更イベント」を参照してください。 Tip
互換性
このメソッドは、次の環境でホストされている配置で使用できます。
MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです
注意
このコマンドは、すべての MongoDB Atlas クラスターでサポートされています。すべてのコマンドに対する Atlas のサポートについては、「サポートされていないコマンド」を参照してください。
MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン
MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン
可用性
配置
db.collection.watch() は次のように、レプリカ セットおよびシャーディングされたクラスターの配置で利用できます。
レプリカセットの場合、データを保持している任意のメンバーに対して
db.collection.watch()を発行できます。シャーディングされたクラスターの場合、 インスタンスで
db.collection.watch()mongosを発行する必要があります。
ストレージ エンジン
読み取り保証 (read concern)majority サポート
変更ストリームは、 "majority"の読み取り保証 (read concern) のサポートに関係なく 使用できます 。つまり、変更ストリームを使用するには、読み取り保証(read concern) majorityサポートを有効にする(デフォルト)か無効にするかを選択できます。
動作
db.collection.watch()データを保持しているノードの大半に反映されたデータ変更についてのみ通知します。変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
再開可能性
MongoDBドライバーとは異なり、 mongoshはエラー後に変更ストリーム カーソルの再開を自動的に試行しません。 MongoDB ドライバーは、特定のエラーが発生後に 変更ストリーム カーソルの自動的な再開を1 回試行します。
db.collection.watch()は、oplog に保存されている情報を使用して変更イベントの説明を生成し、その操作に関連付けられた再開トークンを生成します。 resumeAfterまたはstartAfterオプションに渡される再開トークンによって識別される操作がすでにoplogから削除されている場合、 db.collection.watch()は変更ストリームを再開できません。
変更ストリームの再開の詳細については、「変更ストリームの再開」を参照してください。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfterを使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じることがあります。閉じた変更ストリームのカーソルは完全に再開できない場合があります。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、 resumeAfterを使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.
アップデート 操作の完全なドキュメント検索
デフォルトでは、変更ストリーム カーソルはアップデート操作における特定のフィールドの変更またはデルタを返します。また、変更されたドキュメントのうち、過半数がコミットした最新のバージョンを検索して返すように変更ストリームを構成できます。アップデートと検索の間に他の書き込み操作が行われた場合、返されるドキュメントがアップデート実行時のドキュメントと大幅に異なる可能性があります。
アップデート操作中に適用された変更の数と完全なドキュメントのサイズによっては、アップデート操作における変更イベント ドキュメントのサイズが BSON ドキュメントの制限である 16 MB を超えるリスクがあります。サイズが超過した場合、サーバーで変更ストリーム カーソルが閉じられ、エラーが返されます。
アクセス制御
アクセス制御を使用して実行中の場合、ユーザーは コレクション リソースに対して find および changeStream の特権アクションを持っている必要があります。つまり、ユーザーは、次の特権を付与するロールを持っていなければなりません。
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
組み込みの read ロールにより、適切な権限が付与されます。
カーソルの反復
MongoDB には、カーソルを反復処理する方法が複数用意されています。
cursor.hasNext() メソッドの場合、ブロックして次のイベントを待機します。watchCursor カーソルを監視してイベントを反復処理するには、次のように hasNext() を使用します。
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
cursor.tryNext() メソッドはノンブロッキングです。watchCursor カーソルを監視してイベントを反復処理するには、次のように tryNext() を使用します。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例
変更ストリームを開く
次の操作は、 data.sensors コレクションに対し変更ストリーム カーソルを開きます。
watchCursor = db.getSiblingDB("data").sensors.watch()
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.tryNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
注意
変更ストリームでは isExhausted() を使用できません。
変更ストリームで updateLookup に fullDocument オプションを使用する
fullDocument オプションを "updateLookup" に設定すると、アップデートされた変更ストリーム イベントに関連するドキュメントについて、過半数がコミットした最新のバージョンを参照するよう、変更ストリーム カーソルに指示します。
次の操作は、 fullDocument : "updateLookup"オプションを使用して、data.sensors コレクションに対し変更ストリーム カーソルを開きます。
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.tryNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
どのアップデート操作も、変更イベントはドキュメント検索の結果を fullDocument フィールドに返します。
fullDocumentのアップデート出力の例については、「変更ストリーム アップデート イベント」を参照してください。
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
変更ストリームにおけるドキュメントの変更前と変更後のイメージ
MongoDB 6.0 以降では、変更ストリーム イベントを使用して、変更前と変更後のドキュメントのバージョン(変更前とイメージと変更後のイメージ)を出力できます。
変更前のイメージとは、置換、更新、または削除される前のドキュメントです。挿入されたドキュメントには、変更前のイメージはありません。
変更後のイメージとは、挿入、置換、または更新された後のドキュメントです。削除されたドキュメントには、変更後のイメージはありません。
、 、または
changeStreamPreAndPostImagesdb.createCollection()createcollModを使用し、コレクションに対して を有効にします。例、collModコマンドを使用する場合は次のようになります。db.runCommand( { collMod: <collection>, changeStreamPreAndPostImages: { enabled: true } } )
変更ストリーム イベントにおいて、次の条件に当てはまる場合、変更前と変更後のイメージは使用できません。
ドキュメントの更新または削除操作時に、コレクションにおいて有効になっていない場合。
expireAfterSecondsで設定した、変更前と変更後のイメージ保持時間が経過した後に削除された場合。次の例では、クラスター全体で
expireAfterSecondsを100秒に設定します。use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 注意
setClusterParameterコマンドはMongoDB Atlasクラスターではサポートされていません。すべてのコマンドに対する Atlas のサポートの詳細については、「 Atlas でサポートされていないコマンド 」を参照してください。次の例では、
expireAfterSecondsを含む現在のchangeStreamOptions設定を返します。db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) expireAfterSecondsをoffに設定すると、デフォルトの保持ポリシーが適用されます。対応する変更ストリーム イベントがoplog から削除されるまで、変更前と変更後のイメージは保持されます。変更ストリーム イベントが oplog から削除されると、
expireAfterSecondsの変更前と変更後のイメージの保持時間にかかわらず、対応する変更前と変更後のイメージも削除されます。
その他の考慮事項
変更前と変更後のイメージを有効にすると、ストレージ容量が消費され、処理時間が増えます。変更前と変更後のイメージは、必要な場合のみ有効にしてください。
変更ストリーム イベントのサイズを 16 メビバイト未満に制限します。イベントのサイズを制限するには、次の方法があります。
ドキュメントのサイズを 8 MB に制限します。
updateDescriptionのような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、変更ストリーム出力で変更前と変更後のイメージを同時にリクエストできます。updateDescriptionのような他の変更ストリーム イベントのフィールドが大きくない場合、最大 16 メビバイトのドキュメントの変更ストリーム出力では、変更後のイメージのみをリクエストします。次の場合、16 メビバイトまでのドキュメントの変更ストリーム出力で、変更前のイメージのみをリクエストします。
ドキュメントのアップデートがドキュメントの構造または内容のごく一部にしか影響しない場合、そして
replace変更イベントが発生しない場合。replaceイベントには、常に変更後のイメージが含まれます。
変更前イメージをリクエストするには、
db.collection.watch()でfullDocumentBeforeChangeをrequiredまたはwhenAvailableに設定します。変更後イメージをリクエストするには、同じ方法でfullDocumentを設定します。変更前のイメージは
config.system.preimagesコレクションに書き込まれます。config.system.preimagesコレクションが大きくなる場合があります。コレクションのサイズを制限するには、前述のとおり、変更前のイメージにexpireAfterSeconds時間を設定します。変更前のイメージはバックグラウンド プロセスによって非同期で削除されます。
重要
下位互換性のない機能
MongoDB 6.0 以降では、変更ストリームにドキュメントの変更前のイメージと変更後のイメージを使用している場合、以前の MongoDB バージョンにダウングレードする前に、collMod コマンドを使用して各コレクションの changeStreamPreAndPostImages を無効にする必要があります。
Tip
変更ストリーム イベントと出力については、「変更イベント」を参照してください。
コレクションの変更を監視するには、
db.collection.watch()を参照してください。変更ストリーム出力の完全な例については、「 Change Streams とドキュメントの変更前イメージおよび変更後イメージ 」を参照してください。
コレクションを作成
changeStreamPreAndPostImages が有効になっている temperatureSensor コレクションを作成します。
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
temperatureSensor コレクションに読み取った温度の値を入力します。
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
次のセクションでは、temperatureSensor コレクションを使用する、ドキュメントの変更前と変更後のイメージの変更ストリームの例を示します。
ドキュメントの変更前のイメージを使用する変更ストリーム
ドキュメントの変更前イメージが使用可能な場合は、fullDocumentBeforeChange: "whenAvailable" 設定を使用して出力します。変更前のイメージとは、置換、更新、または削除される前のドキュメントです。挿入されたドキュメントには、変更前のイメージはありません。
次の例では、fullDocumentBeforeChange:
"whenAvailable" を使用してtemperatureSensor コレクションの変更ストリーム カーソルを作成します。
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
次の例では、カーソルを使用して新しい変更ストリーム イベントを確認します。
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
この例では、次のことが行われます。
whileループはカーソルが閉じるまで実行されます。カーソルにドキュメントがある場合、
hasNext()はtrueを返します。
次の例では、temperatureSensor ドキュメントの reading フィールドをアップデートします。
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
temperatureSensor ドキュメントがアップデートされると、変更イベントによってドキュメントの変更前のイメージが fullDocumentBeforeChange フィールドに出力されます。変更前のイメージには、アップデート前のtemperatureSensor ドキュメントの reading フィールドが含まれています。たとえば次のとおりです。
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
Tip
ドキュメントのアップデート出力の詳細については、「変更ストリーム のアップデート イベント」を参照してください。
変更ストリームの出力の詳細については、「変更イベント」を参照してください。
ドキュメントの変更後のイメージを使用する変更ストリーム
ドキュメントの変更後のイメージが使用可能な場合は、fullDocument: "whenAvailable" 設定を使用して出力します。変更後のイメージとは、挿入、置換、または更新された後のドキュメントです。削除されたドキュメントには、変更後のイメージはありません。
次の例では、fullDocument:
"whenAvailable" を使用してtemperatureSensor コレクションの変更ストリーム カーソルを作成します。
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
次の例では、カーソルを使用して新しい変更ストリーム イベントを確認します。
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
この例では、次のことが行われます。
whileループはカーソルが閉じるまで実行されます。カーソルにドキュメントがある場合、
hasNext()はtrueを返します。
次の例では、temperatureSensor ドキュメントの reading フィールドをアップデートします。
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
temperatureSensor ドキュメントがアップデートされると、変更イベントによってドキュメントの変更後のイメージが fullDocument フィールドに出力されます。変更後のイメージには、アップデート後のtemperatureSensor ドキュメントの reading フィールドが含まれています。たとえば次のとおりです。
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
ドキュメントのアップデート出力の詳細については、「変更ストリーム のアップデート イベント」を参照してください。
変更ストリームの出力の詳細については、「変更イベント」を参照してください。
変更ストリームで集約パイプライン フィルターを使用する
注意
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
次の操作は、集約パイプラインを使用して data.sensors コレクションに対する変更ストリーム カーソルを開き、insert イベントのみをフィルタリングします。
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.hasNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
変更ストリーム カーソルは、 operationType が insert である変更イベントのみを返します。変更ストリームの出力における詳細なドキュメントについては、「変更イベント」を参照してください。
変更ストリームの再開
変更ストリーム カーソルによって返されるすべてのドキュメントには、_id フィールドとして再開トークンが含まれます。変更ストリームを再開するには、再開する変更イベントの _id ドキュメント全体を watch() の resumeAfter または startAfter オプションに渡します。
次の操作では、再開トークンを使用して data.sensors コレクションに対する変更ストリーム カーソルを再開します。再開トークンを生成した操作が、クラスターの oplog からロール オフされていないことを前提としています。
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.hasNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
変更ストリームの再開に関する詳細なドキュメントについては、「変更ストリームの再開」を参照してください。