--- title: "Getting started" author: "Andrea Dodet" date: "`r Sys.Date()`" vignette: > %\VignetteIndexEntry{googlePubsubR} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- # `googlePubsubR` This vignette will provide a walk through for the most common functions that will cover the majority of use cases. ## Authentication In order to authenticate the client, the following environment variables will need to be set: * `GCP_AUTH_FILE`: path to the .json file containing GCP credentials for a service account enabled to interact with the Pub/Sub API. You can also directly pass the path as a function argument. * `GCP_PROJECT`: your GCP project id. Upon setting the environment variables, it is just a matter of calling: ```r pubsub_auth() ``` More authentication methods can be found in the `?pubsub_auth` documentation. ## Creating and getting resources ### Topics In their most basic form, topics can be created in the following way: ```r # Create a topic that retains messages for a minimum of 4 hours. # We'll attach some cutom labels to make it easier to spot topic <- topics_create(name = "vignette-topic", message_retention_duration = 14400) # A topic object is returned topic # $labels # $labels$type # [1] "pkg_vignette" # # # $name # [1] "projects//topics/vignette-topic" # # $kmsKeyName # NULL # # $satisfiesPzs # NULL # # $messageStoragePolicy # NULL # # $schemaSettings # NULL # # $messageRetentionDuration # [1] "14400s" # # attr(,"class") # [1] "Topic" "list" ``` It is possible to interact with pre-existing topic objects. For instance, one could retrieve a `Topic` object from an existing topic: ```r if(topics_exists("vignette-topic")) { topic <- topics_get("vignette-topic") } ``` In order to consume messages from a topic, a subscription will be needed: ```r # You can either pass a Topic object or a topic name sub <- subscriptions_create( name = "vignette-sub", topic = topic, # Messages will expire after 3 days of inactivity (no messages acked) expiration_policy = ExpirationPolicy(86400), # We'll retain unacked messages for 12 hours msg_retention_duration = 43200, # We'll retry message delivery with at least 1 second delay from the previous try retry_policy = RetryPolicy(min_backoff = 1) ) sub # $deadLetterPolicy # NULL # # $messageRetentionDuration # [1] "82400s" # # $labels # NULL # # $retryPolicy # $retryPolicy$minimumBackoff # [1] "1s" # # $retryPolicy$maximumBackoff # [1] "600s" # # # $pushConfig # named list() # # $ackDeadlineSeconds # [1] 10 # # $expirationPolicy # $expirationPolicy$ttl # [1] "86400s" # # # $filter # NULL # # $detached # NULL # # $retainAckedMessages # NULL # # $topic # [1] "projects//topics/vignette-topic" # # $name # [1] "projects//subscriptions/vignette-sub" # # $enableMessageOrdering # NULL # # $topicMessageRetentionDuration # [1] "14400s" # # attr(,"class") # [1] "Subscription" "list" ``` We can also inspect all subscriptions attached to a given topic. ```r topics_list_subscriptions(topic = "vignette-topic") ``` ### Schemas Schemas can be used to enforce a specific format on incoming messages, this will force all malformed messages to be discarded or sent to a dead letter queue. A schema object can be passed to a topic upon creation. Getting the schema definition in the right format (Pub/Sub expects it as a string) can be quite fiddly. In this example we'll define an AVRO schema. ```r avro_schema <- schemas_create( name = "vignette-schema", type = "AVRO", # toJSON as the API expects a string containing the definition of the AVRO schema definition = toJSON(list( name = "cutlery", type = "record", fields = list( list("name" = "name", "type" = "string"), list("name" = "price", "type" = "int") ) ), auto_unbox = T) ) # Test a message against the schema msg <- list( name = "John", price = 123 ) %>% toJSON(auto_unbox = T) %>% charToRaw() %>% base64enc::base64encode() %>% PubsubMessage() schemas_validate_message(schema = "vignette-schema", message = msg, encoding = "JSON") # [1] TRUE # Create a new topic and attach the schema topic <- topics_create( name = "vignette-topic", message_retention_duration = 14400, schema_settings = SchemaSettings(encoding = "JSON", "vignette-schema") ) ``` ### Snapshots and `seek` There are two ways to `seek` a subscription back in time: * Provide a snapshot of the subscription * Via a timestamp in RFC3339 UTC "Zulu" format ```r # Create a snapshot of a subscription snapshot <- snapshots_create(name = "vignette-snap", subscription = "vignette-sub") # 'Rewind' the subscription to the snapshot we've just created subscriptions_seek(subscription = "vignette-sub", snapshot = snapshot) # [1] TRUE # Seek the subscription to a specific timestamp subscriptions_seek("vignette-sub", time = "2021-11-08T23:55:00Z") ``` ## Messages Pubsub messages are expected encoded as base64 strings, depending on the object you're dealing with, the process to convert them in the right format might vary. Below we'll convert a dataframe in a format that will not upset Pubsub that much: ```r msg <- data.frame(name = "fork", price = 999) %>% as.list() %>% toJSON(auto_unbox = TRUE) %>% charToRaw() %>% base64enc::base64encode() %>% PubsubMessage() topics_publish(messages = msg, topic = 'vignette-topic')) ``` Now that we have successfully published a message (conforming to the schema specified above) we can pull messages from the subscription. Note that pulling will not take messages out of the queue (you can do it as many times you want). Messages will need to be acknowledged with `subscriptions_ack` after they have been successfully consumed in order to be successfully taken out of the queue. Pulling messages will return a list containing a `receivedMessages` dataframe containing ackIds and a dataframe storing and message data and information. ```r msgs <- subscriptions_pull("vignette-sub") tibble::glimpse(msgs) List of 1 $ receivedMessages:'data.frame': 1 obs. of 2 variables: ..$ ackId : chr "PkVTRFAGFixdRkhRNxkIaFEOT14jPzUgKEUSC1MTUVx1A1MQaVwzdQdRDRlzejV1aQwRVAsUUHRfURsfWVxEjczJsS9QXWJxa1oQAwJHUH1YUxw"| __truncated__ ..$ message:'data.frame': 1 obs. of 4 variables: .. ..$ data : chr "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9" .. ..$ attributes :'data.frame': 1 obs. of 2 variables: .. ..$ messageId : chr "3410320233787713" .. ..$ publishTime: chr "2021-11-09T09:06:31.171Z" tibble::glimpse(msgs$receivedMessages$message) Rows: 1 Columns: 4 $ data "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9" $ attributes $ messageId "3410320233787713" $ publishTime "2021-11-09T09:06:31.171Z ``` ### Decoding incoming messages In order to re-convert back message data into an usable format we'll basically need to reverse the process that was used to encode them in the first place. Given that `subscriptions_pull` will return **all** messages in the queue, a good strategy might be to decode them all at once using `lapply` ```r decoded_msg <- lapply(msgs$receivedMessages$message$data, function(msg) { msg %>% base64decode() %>% rawToChar() %>% fromJSON(flatten = TRUE, simplifyDataFrame = TRUE) %>% as.data.frame() }) %>% do.call(rbind, .) decoded_msg # name price # 1 fork 999 # ... Do something with the dataframe # Acknowledge we have succesfully used it subscriptions_ack(ack_ids = msgs$receivedMessages$ackId, subscription = "vignette-sub") # [1] TRUE ``` This approach is quite cumbersome and helpers to facilitate this process might be included in the library later on when a general enough approach will be found.