Provided by: libmongoc-doc_1.24.3-1_all bug

NAME

       mongoc_change_stream_t - mongoc_change_stream_t

SYNOPSIS

          #include <mongoc/mongoc.h>

          typedef struct _mongoc_change_stream_t mongoc_change_stream_t;

       mongoc_change_stream_t  is  a handle to a change stream. A collection change stream can be
       obtained using mongoc_collection_watch().

       It is recommended to use a mongoc_change_stream_t and  its  functions  instead  of  a  raw
       aggregation  with a $changeStream stage. For more information see the MongoDB Manual Entry
       on Change Streams.

EXAMPLE

       example-collection-watch.c

          #include <mongoc/mongoc.h>

          int
          main (void)
          {
             bson_t empty = BSON_INITIALIZER;
             const bson_t *doc;
             bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
             const bson_t *err_doc;
             bson_error_t error;
             const char *uri_string;
             mongoc_uri_t *uri;
             mongoc_client_t *client;
             mongoc_collection_t *coll;
             mongoc_change_stream_t *stream;
             mongoc_write_concern_t *wc = mongoc_write_concern_new ();
             bson_t opts = BSON_INITIALIZER;
             bool r;

             mongoc_init ();

             uri_string = "mongodb://"
                          "localhost:27017,localhost:27018,localhost:"
                          "27019/db?replicaSet=rs0";

             uri = mongoc_uri_new_with_error (uri_string, &error);
             if (!uri) {
                fprintf (stderr,
                         "failed to parse URI: %s\n"
                         "error message:       %s\n",
                         uri_string,
                         error.message);
                return EXIT_FAILURE;
             }

             client = mongoc_client_new_from_uri (uri);
             if (!client) {
                return EXIT_FAILURE;
             }

             coll = mongoc_client_get_collection (client, "db", "coll");
             stream = mongoc_collection_watch (coll, &empty, NULL);

             mongoc_write_concern_set_wmajority (wc, 10000);
             mongoc_write_concern_append (wc, &opts);
             r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
             if (!r) {
                fprintf (stderr, "Error: %s\n", error.message);
                return EXIT_FAILURE;
             }

             while (mongoc_change_stream_next (stream, &doc)) {
                char *as_json = bson_as_relaxed_extended_json (doc, NULL);
                fprintf (stderr, "Got document: %s\n", as_json);
                bson_free (as_json);
             }

             if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
                if (!bson_empty (err_doc)) {
                   fprintf (stderr,
                            "Server Error: %s\n",
                            bson_as_relaxed_extended_json (err_doc, NULL));
                } else {
                   fprintf (stderr, "Client Error: %s\n", error.message);
                }
                return EXIT_FAILURE;
             }

             bson_destroy (to_insert);
             mongoc_write_concern_destroy (wc);
             bson_destroy (&opts);
             mongoc_change_stream_destroy (stream);
             mongoc_collection_destroy (coll);
             mongoc_uri_destroy (uri);
             mongoc_client_destroy (client);
             mongoc_cleanup ();

             return EXIT_SUCCESS;
          }

   Starting and Resuming
       All watch functions accept several options to indicate where a change stream should  start
       returning changes from: resumeAfter, startAfter, and startAtOperationTime.

       All  changes  returned  by  mongoc_change_stream_next()  include a resume token in the _id
       field. MongoDB 4.2 also includes an  additional  resume  token  in  each  "aggregate"  and
       "getMore"  command response, which points to the end of that response's batch. The current
       token is automatically cached by libmongoc. In the event of an error,  libmongoc  attempts
       to  recreate  the  change  stream  starting where it left off by passing the cached resume
       token. libmongoc only attempts to resume once, but  client  applications  can  access  the
       cached  resume token with mongoc_change_stream_get_resume_token() and use it for their own
       resume logic by passing it as either the resumeAfter or startAfter option.

       Additionally, change streams can start returning changes at an operation time by using the
       startAtOperationTime  field. This can be the timestamp returned in the operationTime field
       of a command reply.

       resumeAfter, startAfter, and startAtOperationTime are mutually exclusive options.  Setting
       more than one will result in a server error.

       The  following  example implements custom resuming logic, persisting the resume token in a
       file.

       example-resume.c

          #include <mongoc/mongoc.h>

          /* An example implementation of custom resume logic in a change stream.
          * example-resume starts a client-wide change stream and persists the resume
          * token in a file "resume-token.json". On restart, if "resume-token.json"
          * exists, the change stream starts watching after the persisted resume token.
          *
          * This behavior allows a user to exit example-resume, and restart it later
          * without missing any change events.
          */
          #include <unistd.h>

          static const char *RESUME_TOKEN_PATH = "resume-token.json";

          static bool
          _save_resume_token (const bson_t *doc)
          {
             FILE *file_stream;
             bson_iter_t iter;
             bson_t resume_token_doc;
             char *as_json = NULL;
             size_t as_json_len;
             ssize_t r, n_written;
             const bson_value_t *resume_token;

             if (!bson_iter_init_find (&iter, doc, "_id")) {
                fprintf (stderr, "reply does not contain operationTime.");
                return false;
             }
             resume_token = bson_iter_value (&iter);
             /* store the resume token in a document, { resumeAfter: <resume token> }
              * which we can later append easily. */
             file_stream = fopen (RESUME_TOKEN_PATH, "w+");
             if (!file_stream) {
                fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
                return false;
             }
             bson_init (&resume_token_doc);
             BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
             as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
             bson_destroy (&resume_token_doc);
             n_written = 0;
             while (n_written < as_json_len) {
                r = fwrite ((void *) (as_json + n_written),
                            sizeof (char),
                            as_json_len - n_written,
                            file_stream);
                if (r == -1) {
                   fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
                   bson_free (as_json);
                   fclose (file_stream);
                   return false;
                }
                n_written += r;
             }

             bson_free (as_json);
             fclose (file_stream);
             return true;
          }

          bool
          _load_resume_token (bson_t *opts)
          {
             bson_error_t error;
             bson_json_reader_t *reader;
             bson_t doc;

             /* if the file does not exist, skip. */
             if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
                return true;
             }
             reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
             if (!reader) {
                fprintf (stderr,
                         "failed to open %s for reading: %s\n",
                         RESUME_TOKEN_PATH,
                         error.message);
                return false;
             }

             bson_init (&doc);
             if (-1 == bson_json_reader_read (reader, &doc, &error)) {
                fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
                bson_destroy (&doc);
                bson_json_reader_destroy (reader);
                return false;
             }

             printf ("found cached resume token in %s, resuming change stream.\n",
                     RESUME_TOKEN_PATH);

             bson_concat (opts, &doc);
             bson_destroy (&doc);
             bson_json_reader_destroy (reader);
             return true;
          }

          int
          main (void)
          {
             int exit_code = EXIT_FAILURE;
             const char *uri_string;
             mongoc_uri_t *uri = NULL;
             bson_error_t error;
             mongoc_client_t *client = NULL;
             bson_t pipeline = BSON_INITIALIZER;
             bson_t opts = BSON_INITIALIZER;
             mongoc_change_stream_t *stream = NULL;
             const bson_t *doc;

             const int max_time = 30; /* max amount of time, in seconds, that
                                         mongoc_change_stream_next can block. */

             mongoc_init ();
             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
             uri = mongoc_uri_new_with_error (uri_string, &error);
             if (!uri) {
                fprintf (stderr,
                         "failed to parse URI: %s\n"
                         "error message:       %s\n",
                         uri_string,
                         error.message);
                goto cleanup;
             }

             client = mongoc_client_new_from_uri (uri);
             if (!client) {
                goto cleanup;
             }

             if (!_load_resume_token (&opts)) {
                goto cleanup;
             }
             BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);

             printf ("listening for changes on the client (max %d seconds).\n", max_time);
             stream = mongoc_client_watch (client, &pipeline, &opts);

             while (mongoc_change_stream_next (stream, &doc)) {
                char *as_json;

                as_json = bson_as_canonical_extended_json (doc, NULL);
                printf ("change received: %s\n", as_json);
                bson_free (as_json);
                if (!_save_resume_token (doc)) {
                   goto cleanup;
                }
             }

             exit_code = EXIT_SUCCESS;

          cleanup:
             mongoc_uri_destroy (uri);
             bson_destroy (&pipeline);
             bson_destroy (&opts);
             mongoc_change_stream_destroy (stream);
             mongoc_client_destroy (client);
             mongoc_cleanup ();
             return exit_code;
          }

       The following example shows using startAtOperationTime to synchronize a change stream with
       another operation.

       example-start-at-optime.c

          /* An example of starting a change stream with startAtOperationTime. */
          #include <mongoc/mongoc.h>

          int
          main (void)
          {
             int exit_code = EXIT_FAILURE;
             const char *uri_string;
             mongoc_uri_t *uri = NULL;
             bson_error_t error;
             mongoc_client_t *client = NULL;
             mongoc_collection_t *coll = NULL;
             bson_t pipeline = BSON_INITIALIZER;
             bson_t opts = BSON_INITIALIZER;
             mongoc_change_stream_t *stream = NULL;
             bson_iter_t iter;
             const bson_t *doc;
             bson_value_t cached_operation_time = {0};
             int i;
             bool r;

             mongoc_init ();
             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
             uri = mongoc_uri_new_with_error (uri_string, &error);
             if (!uri) {
                fprintf (stderr,
                         "failed to parse URI: %s\n"
                         "error message:       %s\n",
                         uri_string,
                         error.message);
                goto cleanup;
             }

             client = mongoc_client_new_from_uri (uri);
             if (!client) {
                goto cleanup;
             }

             /* insert five documents. */
             coll = mongoc_client_get_collection (client, "db", "coll");
             for (i = 0; i < 5; i++) {
                bson_t reply;
                bson_t *insert_cmd = BCON_NEW ("insert",
                                               "coll",
                                               "documents",
                                               "[",
                                               "{",
                                               "x",
                                               BCON_INT64 (i),
                                               "}",
                                               "]");

                r = mongoc_collection_write_command_with_opts (
                   coll, insert_cmd, NULL, &reply, &error);
                bson_destroy (insert_cmd);
                if (!r) {
                   bson_destroy (&reply);
                   fprintf (stderr, "failed to insert: %s\n", error.message);
                   goto cleanup;
                }
                if (i == 0) {
                   /* cache the operation time in the first reply. */
                   if (bson_iter_init_find (&iter, &reply, "operationTime")) {
                      bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
                   } else {
                      fprintf (stderr, "reply does not contain operationTime.");
                      bson_destroy (&reply);
                      goto cleanup;
                   }
                }
                bson_destroy (&reply);
             }

             /* start a change stream at the first returned operationTime. */
             BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
             stream = mongoc_collection_watch (coll, &pipeline, &opts);

             /* since the change stream started at the operation time of the first
              * insert, the five inserts are returned. */
             printf ("listening for changes on db.coll:\n");
             while (mongoc_change_stream_next (stream, &doc)) {
                char *as_json;

                as_json = bson_as_canonical_extended_json (doc, NULL);
                printf ("change received: %s\n", as_json);
                bson_free (as_json);
             }

             exit_code = EXIT_SUCCESS;

          cleanup:
             mongoc_uri_destroy (uri);
             bson_destroy (&pipeline);
             bson_destroy (&opts);
             if (cached_operation_time.value_type) {
                bson_value_destroy (&cached_operation_time);
             }
             mongoc_change_stream_destroy (stream);
             mongoc_collection_destroy (coll);
             mongoc_client_destroy (client);
             mongoc_cleanup ();
             return exit_code;
          }

AUTHOR

       MongoDB, Inc

COPYRIGHT

       2017-present, MongoDB, Inc