An example of an event-driven webhook-based application in S3 object storage Mail.ru Cloud Solutions



Rube Goldberg coffee machine



Event-driven architecture increases the cost efficiency of the resources used because they are used only when they are needed. There are many options for how to implement this and not create additional cloud entities as worker applications. And today I will not talk about FaaS, but about webhooks. I'll show you a tutorial example of handling events with Object Storage webhooks.



A few words about object storage and webhooks. Object storages allow you to store any data in the cloud as objects accessible via S3 or another API (depending on the implementation) via HTTP / HTTPS. Webhooks are generally custom HTTP callbacks. They are usually triggered by an event, such as a code submission to a repository or a comment posted on a blog. When an event occurs, the origin site sends an HTTP request to the URL specified for the webhook. As a result, you can make events on one site trigger actions on another ( wiki ). When the source site is Object Storage, the events are changes to its content.



Examples of simple cases when such automation can be used:



  1. . Β« Β», .
  2. , , .
  3. ( , , , ).
  4. , , Kubernetes, , .


As an example, we will make a variant of task 1, when changes in the Mail.ru Cloud Solutions (MCS) object storage bucket are synchronized using webhooks in the AWS object storage. In a real loaded case, you should provide for asynchronous work by registering webhooks in the queue, but for the educational task we will make the implementation without this.



Scheme of work



The communication protocol is described in detail in the S3 webhooks guide on MCS . The scheme of work has the following elements:



  • A publishing service that sits on the S3 side and publishes HTTP requests when a webnhook fires.
  • A webhook receiving server that listens for requests from the HTTP publishing service and takes appropriate action. The server can be written in any language, in our example we will write the server in Go.


The peculiarity of the webhook implementation in the S3 API is the registration of the webhook reception server on the publication service. In particular, the webhook receiving server must confirm the subscription to the publishing service messages (in other webhook implementations, it is usually not required to confirm the subscription).



Accordingly, the webhook receiving server must support two main operations:



  • respond to a request from the publication service for confirmation of registration,
  • process incoming events.


Installing the server for receiving webhooks



To run the webhook receiving server, you need a Linux server. In this article, as an example, we use a virtual instance that we deploy to MCS.



Install the required software and launch the webhook server.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...


Clone the folder with the webhook receiving server:



ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.


Let's start the server:



ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80


Subscribing to the publishing service



You can register your server for receiving webhooks via API or web interface. For simplicity, we will register through the web interface:



  1. Go to the buckets section in the control room.
  2. We go to the bucket, for which we will set up webhooks, and click on the gear:






Go to the Webhooks tab and click Add:





Fill in the fields:







ID - the name of the webhook.



Event - what events to send. We have set the transfer of all events that occur when working with files (adding and deleting).



URL - address of the webhook receiving server.



Filter prefix / suffix is ​​a filter that allows generating webhooks only for objects whose names match certain rules. For example, to make the webhook work only files with the .png extension, write "png" in the Filter suffix .



Currently, only ports 80 and 443 are supported for accessing the webhook receiving server.



Click Add hook and see the following:





Hook added.



The server for receiving webhooks in the logs shows the progress of the hook registration process:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d


Registration is over. In the next section, we will take a closer look at the algorithm of the webhook reception server.



Description of the server for receiving webhooks



In our example, the server is written in Go. Let's analyze the basic principles of its work.



package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %s\n", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pong\n")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}


Let's consider the main functions:



  • Ping () is a route that responds by URL / ping, the simplest implementation of a liveness probe.
  • Webhook () - main route, URL / webhook handler:

    • confirms registration on the publishing service (transition to the SubscriptionConfirmation function),
    • processes incoming webhooks (Gotrecords function).
  • The HmacSha256 and HmacSha256hex functions are implementations of the HMAC-SHA256 and HMAC-SHA256 encryption algorithms with the output as a string of hexadecimal numbers for signature subtraction.
  • main is the main function, processes command line parameters and registers URL handlers.


Command line parameters accepted by the server:



  • -port is the port on which the server will listen.
  • -address is the IP address that the server will listen to.
  • -script is an external program that is called on every hook that comes in.


Let's take a closer look at some of the functions:



//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %s\n", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
"\"Type\":\"SubscriptionConfirmation\"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}


This function determines what has come - a request for confirmation of registration or a webhook. As follows from the documentation , in case of registration confirmation, the following Json structure comes in the Post request:



POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":Β«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tAΒ»
}


This request needs to be answered:



content-type: application/json

{"signature":Β«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37afΒ»}


Where the signature is calculated as:



signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))


If a webhook arrives, then the structure of the Post request looks like this:



POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}


Accordingly, depending on the request, you need to understand how to process the data. I chose a record as an indicator "Type":"SubscriptionConfirmation", since it is present in the request to confirm the subscription and is not present in the webhook. Based on the presence / absence of this record in the POST request, further program execution goes either into a function SubscriptionConfirmationor into a function GotRecords.



We will not consider the SubscriptionConfirmation function in detail, it is implemented according to the principles set forth in the documentation . You can check the source code for this function in the project's git repository .



The GotRecords function parses the incoming request and, for each Record object, calls an external script (whose name was passed in the -script parameter) with the parameters:



  • bucket name
  • object key
  • act:

    • copy - if in the original request EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete - if in the original request EventName = ObjectRemoved | DeleteObject


Thus, if a hook with a Post request arrives, as described above , and the -script = script.sh parameter, then the script will be called as follows:



script.sh  bucketA some-file-to-bucket copy


It should be understood that this webhook receiving server is not a complete production solution, but a simplified example of a possible implementation.



Example of work



Let's synchronize the files of the main bucket in MCS to the backup bucket in AWS. The main bucket is called myfiles-ash, the backup is myfiles-backup (configuring a bucket on AWS is outside the scope of this article). Accordingly, when a file is placed in the main bucket, its copy should appear in the backup, when deleted from the main one, it should be deleted in the backup.



We will work with buckets using the awscli utility, with which both MCS cloud storage and AWS cloud storage are compatible.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...


Let's configure access to the S3 MCS API:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:


Let's configure access to the AWS S3 API:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:


Let's check accesses:



To AWS:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup


For MCS, when the command is running, add --endpoint-url:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash


Accessed.



Now let's write a script for handling the incoming hook, let's call it s3_backup_mcs_aws.sh



#!/bin/bash
# Require aws cli
# if file added β€” copy it to backup bucket
# if file removed β€” remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: ${0} sourcebucket sourcefile copy/delete"
    exit 1
    ;;
esac


We start the server:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh


Checking out how it works. Through the web interface MCS add test.txt file in the bucket myfiles-ash. In the logs in the console, you can see that a request was made to the webhook server:



2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt


Let's check the contents of the myfiles-backup bucket in AWS:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt


Now, through the web interface, delete the file from the myfiles-ash bucket.



Server logs:



2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt


Bucket content:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$


File deleted, problem solved.



Conclusion and ToDo



All the code used in this article is in my repository . There are also examples of scripts and examples of counting signatures for registering webhooks.



This code is nothing more than an example of how you can use S3 webhooks in your activities. As I said at the beginning, if you plan to use such a server in production, you must at least rewrite the server for asynchronous work: register incoming webhooks in a queue (RabbitMQ or NATS), and from there disassemble and process them by worker applications. Otherwise, with the massive arrival of webhooks, you may encounter a lack of server resources to perform tasks. The presence of queues allows you to spread the server and workers, as well as solve issues with repeating tasks in case of failures. It is also desirable to change the logging to a more detailed and more standardized one.



Good luck!



Read more on the topic:






All Articles