Third Light Developer Exchange

Code and templating community forum for developers and software integrators

You are not logged in.

#1 2017-04-25 16:21:54

thirdlight
Third Light Staff
Registered: 2013-06-06
Posts: 19

Walk-Thru: using AMQP integration to watch for new uploads

Third Light servers with the API module can now support AMQP publishing. The main reason for this is to make it possible to watch for new uploads in real-time (without polling) and run middleware that does work on those new uploads (which can be anything at all you need to do).

Advanced Message Queuing Protocol (AMQP - https://www.amqp.org/) is a standard for message-oriented middleware. It allows messages to be queued and routed between publishers and subscribers. Your Third Light server can act as a publisher, and notify subscribers when a new file is uploaded to the Third Light server. This is helpful if you are expecting to do work based on new files being uploaded, for example triggering events or metadata changes which affect other systems. The main benefit of AMQP is that there is no need to repeatedly poll for new files - the message queue alerts you to these changes in real-time. AMQP is generally part of middleware or integration solutions which rely on timely and efficient  interactions, and is therefore strongly recommended over repeatedly polling the Third Light API for changes, for example.
In the current release of Third Light, the only AMQP events which are published are for new file uploads.

Here is an example of how you can integrate Third Light, RabbitMQ (a popular implementation of an AMQP server) and a simple test client, written in Go, to report on new uploads. This is intended to be a minimum viable demonstrator which you could build upon.

Firstly, you will need a Linux computer (or virtual machine). We use Debian Linux at Third Light, and all of our services work is programmed in the Go programming languages. For this demo, we'll need two packages: RabbitMQ and Go.

apt-get install rabbitmq-server golang

This will install a basic RabbitMQ server and the Go programming language, plus any dependencies. You can then add a RabbitMQ user account with liberal permissions, as follows:

rabbitmqctl add_user thirdlight top_secret_password
rabbitmqctl set_permissions thirdlight ".*" ".*" ".*"

On a production system, consider reading the RabbitMQ documentation as these permissions are likely to be too broad for production use.

Next, let's check the Third Light server can send messages to your RabbitMQ server. For this to be possible, remember that your Third Light server must have a TCP/IP route to the host where you are running RabbitMQ (and you must make sure that traffic isn't filtered by a firewall or similar).

If our RabbitMQ server is on the hostname "amqp.example.com" and using the default port of 5672, then the configuration is as follows on the Configuration > Site Options > API > AMQP Notifications page in Third Light:

  • Server address: amqp.example.com

  • Server port: 5672

  • Connect over SSL: leave unchecked

  • Username: thirdlight

  • Password: top_secret_password

  • Exchange Name: thirdlight

  • Exchange Type: Direct

  • Declare a durable exchange: checked

  • Message Key: upload

For help with these settings, please see: https://www.thirdlight.com/docs/display … ifications

When you save your settings, they will be validated. Look for any errors and if necessary, check in /var/log/rabbitmq on your Debian Linux server for low-level information if you find you are unexpectedly prevented from committing the changes due to an error.

If your changes are committed successfully, then ever upload you make will now be published to the "thirdlight" Exchange, with a Message Key of "upload". How do we look at messages on the queue? Let's use a small Go programme to do this. Save this as consumer.go:

// Scroll to see the entire program before you cut and paste!
package main

import (
        "flag"
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "time"
)

var (
        uri          = flag.String("uri", "amqp://thirdlight:top_secret_password@amqp.example.com:5672/", "AMQP URI")
        exchange     = flag.String("exchange", "thirdlight", "Durable, non-auto-deleted AMQP exchange name")
        exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
        queue        = flag.String("queue", "/", "Ephemeral AMQP queue name")
        bindingKey   = flag.String("key", "upload", "AMQP binding key")
        consumerTag  = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
        lifetime     = flag.Duration("lifetime", 0*time.Second, "lifetime of process before shutdown (0s=infinite)")
)

func init() {
        flag.Parse()
}

func main() {
        c, err := NewConsumer(*uri, *exchange, *exchangeType, *queue, *bindingKey, *consumerTag)
        if err != nil {
                log.Fatalf("%s", err)
        }

        if *lifetime > 0 {
                log.Printf("running for %s", *lifetime)
                time.Sleep(*lifetime)
        } else {
                log.Printf("running forever")
                select {}
        }

        log.Printf("shutting down")

        if err := c.Shutdown(); err != nil {
                log.Fatalf("error during shutdown: %s", err)
        }
}

type Consumer struct {
        conn    *amqp.Connection
        channel *amqp.Channel
        tag     string
        done    chan error
}

func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (*Consumer, error) {
        c := &Consumer{
                conn:    nil,
                channel: nil,
                tag:     ctag,
                done:    make(chan error),
        }

        var err error

        log.Printf("dialing %q", amqpURI)
        c.conn, err = amqp.Dial(amqpURI)
        if err != nil {
                return nil, fmt.Errorf("Dial: %s", err)
        }

        go func() {
                fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
        }()

        log.Printf("got Connection, getting Channel")
        c.channel, err = c.conn.Channel()
        if err != nil {
                return nil, fmt.Errorf("Channel: %s", err)
        }

        log.Printf("got Channel, declaring Exchange (%q)", exchange)
        if err = c.channel.ExchangeDeclare(
                exchange,     // name of the exchange
                exchangeType, // type
                true,         // durable
                false,        // delete when complete
                false,        // internal
                false,        // noWait
                nil,          // arguments
        ); err != nil {
                return nil, fmt.Errorf("Exchange Declare: %s", err)
        }

        log.Printf("declared Exchange, declaring Queue %q", queueName)
        queue, err := c.channel.QueueDeclare(
                queueName, // name of the queue
                true,      // durable
                false,     // delete when unused
                false,     // exclusive
                false,     // noWait
                nil,       // arguments
        )
        if err != nil {
                return nil, fmt.Errorf("Queue Declare: %s", err)
        }

        log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
                queue.Name, queue.Messages, queue.Consumers, key)

        if err = c.channel.QueueBind(
                queue.Name, // name of the queue
                key,        // bindingKey
                exchange,   // sourceExchange
                false,      // noWait
                nil,        // arguments
        ); err != nil {
                return nil, fmt.Errorf("Queue Bind: %s", err)
        }

        log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag)
        deliveries, err := c.channel.Consume(
                queue.Name, // name
                c.tag,      // consumerTag,
                false,      // noAck
                false,      // exclusive
                false,      // noLocal
                false,      // noWait
                nil,        // arguments
        )
        if err != nil {
                return nil, fmt.Errorf("Queue Consume: %s", err)
        }

        go handle(deliveries, c.done)

        return c, nil
}

func (c *Consumer) Shutdown() error {
        // will close() the deliveries channel
        if err := c.channel.Cancel(c.tag, true); err != nil {
                return fmt.Errorf("Consumer cancel failed: %s", err)
        }

        if err := c.conn.Close(); err != nil {
                return fmt.Errorf("AMQP connection close error: %s", err)
        }

        defer log.Printf("AMQP shutdown OK")

        // wait for handle() to exit
        return <-c.done
}

func handle(deliveries <-chan amqp.Delivery, done chan error) {
        for d := range deliveries {
                log.Printf(
                        "got %dB delivery: [%v] %q",
                        len(d.Body),
                        d.DeliveryTag,
                        d.Body,
                )
                d.Ack(false)
        }
        log.Printf("handle: deliveries channel closed")
        done <- nil
}

To run this Go programme, fetch the dependencies and run it as follows:

go get
go run consumer.go

You will see something like this, if consumer.go has successfully subscribed to the Exchange:

2017/04/25 16:16:32 dialing "amqp://thirdlight:top_secret_password@amqp.example.com:5672/"
2017/04/25 16:16:32 got Connection, getting Channel
2017/04/25 16:16:32 got Channel, declaring Exchange ("thirdlight")
2017/04/25 16:16:32 declared Exchange, declaring Queue "/"
2017/04/25 16:16:32 declared Queue ("/" 0 messages, 0 consumers), binding to Exchange (key "upload")
2017/04/25 16:16:32 Queue bound to Exchange, starting Consume (consumer tag "simple-consumer")
2017/04/25 16:16:32 running forever

Consumer.go will run until you interrupt it with Control-C.

Now, when you upload a file, you will see this form of output:

2017/04/25 16:16:51 got 57B delivery: [1] "{\"action\":\"AssetUpload\",\"data\":{\"assetId\":\"41851834255\"}}"

This message indicates that a new file has been uploaded, with the Third Light Asset ID of 41851834255.

To do useful work at this stage, you could build on the func handle() in the Go programme. This routine is called on each delivery that is seen by consumer.go. You could begin by using json.Unmarshal() on the message, for example - and from there, your own business logic (perhaps setting metadata, moving the file, sending an alert to Slack?)

If you need help with AMQP or have a project in mind, but no internal resources ready to use, we offer Professional Services. Please drop a note to services@thirdlight.com to start the conversation!

Best wishes,
Michael & the Third Light Professional Services team

Offline

Board footer