Disque


Prelims

This post is about my experience setting up a three node Disque cluster on Google Compute Engine. I use or reference the following technologies:

  • Disque – a queue implementation by antirez based on Redis
  • Disq – a Python client library for Disque
  • RQ – a Python task queue with Redis
  • Terraform – a tool by HashiCorp for deploying VMs in various cloud infrastructures
  • Ansible – a tool to provision VMs via SSH, installing any necessary dependencies and software to ensure the target VM is in a specific state
  • Supervisor – a process control system like init.d, but better

Disque

We currently have a REST API that uses RQ to launch asynchronous jobs for text analysis. For instance, a simple curl localhost:8000/analyze?text=... will create a job, and the user has to poll the server to get the job results. However, RQ uses Python for both the job producers and job consumers (workers). This coupling means it can be difficult to write producers and consumers in different languages. Moving forward, we wanted to use Node.js to serve our REST API but many of our workers had already been written in Python. This meant we had to find an alternative to RQ.

While we did consider using standard queue libraries such as ZeroMQ to build our own queue, I really liked the plug-and-play nature of RQ–simply ensure that all consumers and producers have network access to Redis. Many of the other libraries we looked at, such as Celery or Gearman were also too complex for our simple use-case. The last thing we considered was directly using Redis as a queue and designing proper access with its LPOP and RPUSH commands. However, since antirez already build a queue from the ground up using Redis technology, we decided to give Disque a shot. It has the same benefits as Redis: consumers and producers need only network access. Its only downside: extremely cutting-edge technology with a small chance of support or future development.

Still, that never stopped a curious mind. So, let’s dive in.

Terraform and Ansible

I wanted my code to be easily deployed and modified, so I provisioned virtual machines using Terraform and Ansible. With Terraform, I spun up three instances on Google Cloud Engine. After setting up my GCE accounts, here’s the Terraform file I used (via terraform apply):

resource "google_compute_instance" "disque-node" {
  name          = "disque-${count.index}"
  description   = "The VM hosting a disque server"
  machine_type  = "n1-standard-1"
  zone          = "us-central1-a"
  tags          = ["disque"]

  disk {
      image = "debian-8-jessie-v20150915"
      size  = 100 # for disque persistence
  }

  # this is for ansible to gain access to the machines
  metadata {
    sshKeys = "ansible:${file("keys/gce.pub")}"
  }

  network_interface {
      network = "default"
      access_config {}
  }

  count = 3
}

Note that I use the metadata to add the SSH keys stored in a local keys/ folder to the machines so that Ansible can SSH into them later.

Using ansible-playbook, I installed Disque and Supervisor to monitor the disque-server on each machine. I used the dynamic inventory from CiscoCloud so that Terraform and Ansible would play nice. (I’m hoping Terraform will have Ansible support in the future.) Here’s my Ansible file, which I ran with ansible-playbook -i terraform.py -s -u ansible --private-key=PATH/TO/PRIVATE/KEY disque.yml

- hosts: all
  tasks:
    - name: Create disque node groups
      group_by: key={{ ansible_hostname | regex_replace('-[\d]+', '')}}

- hosts: disque-*
  tasks:
    - name: Install git
      apt: pkg=git state=installed update_cache=true

    - name: Install build tools
      apt: pkg=build-essential state=installed update_cache=true

    - name: Clone disque source
      git: repo=https://github.com/antirez/disque.git dest=/home/ansible/disque

    - name: Build and install disque
      command: make install chdir=/home/ansible/disque

    - name: Install supervisord
      apt: pkg=supervisor state=installed update_cache=true

    - name: Synchronize supervisor configuration
      synchronize: src=disque.conf dest=/etc/supervisor/conf.d/disque.conf
      notify:
        - Restart disque

    - name: Enable supervisord
      service: name=supervisor enabled=yes

    - name: Start supervisord
      service: name=supervisor state=started

  handlers:
    - name: Restart disque
      service: name=supervisor state=restarted

- hosts: disque-0
  tasks:
    - name: Configure disque cluster meets
      command: disque cluster meet {{ hostvars[item].ansible_eth0.ipv4.address }} 7711
      with_items: "{{ groups.disque }}"
      when: "'disque' in item and item != 'disque-0'"

The very last task in my Ansible file is used to tell one of my disque nodes to connect to the others in a cluster. Once I got to this step, I had a running Disque cluster on Google Cloud Engine!

Doing some work

Finally, I used some simple Python code to test my cluster. For submitting jobs, I used the Python Disque client disq. Here’s a rough sketch of a job client (in py3):

import disq
import json

conn = disq.Disque()

def process_job(record):
    job_id = conn.addjob('worker', json.dumps(record))
    print('submitted job {}'.format(job_id))

    # wait for result
    result = get_result(job_id)
    return json.dumps(result)

def get_result(job_id):
    qname, result_id, result = conn.getjob(job_id)[0]
    conn.fastack(result_id)
    return (qname, result_id, json.loads(result.decode('utf8')))

if __name__ == "__main__":
    i = 0
    while True:
      print(process_job({'work': i}))
      i += 1

Obviously, this code could be made parallel; as is, the code is serial and blocks until the job completes. It should be trivial to break out get_result into a separate thread (or greenlet) in Python. The biggest challenge I ran into was figuring out how to return the results of a job. With RQ, we could write the result into Redis and check the key (i.e., the job id), but with Disque, the only method for communication I had was via queues. My initial solution was to put all the results on a results queue, but this would only work if the job client didn’t care to receive the results of its job (and only wanted to receive any results). Instead, at the suggestion of my coworkers, I made a separate queue for each job id, which did the trick.

In a separate process, my workers were implemented as follows:

import disq
import json

conn = disq.Disque()

def handle_job():
    qname, job_id, job = conn.getjob('worker')[0]
    print("got job {}".format(job_id))
    try:
        result = do_stuf_with(job)
    finally:
        conn.fastack(job_id)
    conn.addjob(job_id, json.dumps(result))

if __name__ == "__main__":
    while True:
        handle_record()

One last note: since the Disque HELLO command returns the local IP address of the cluster, my Disque client–running on localhost–could not connect to the private IPs of the GCE cluster. This problem arises with all distributed infrastructure, but a simple solution would be to issue the CLUSTER MEET command with public IPs.

One small issue with Disque (and Redis clustering) is that its CLUSTER MEET command expects an IP address and gives an error when a hostname is used instead. I believe this should be improved, and the hostnames for the cluster returned as part of the HELLO command. However, this is a topic of active discussion on Redis, so I will patiently wait for that to resolve itself.

In any case, I hope this post was helpful and sufficient for folks just getting started with Disque.

Related Posts

A word on notation

A description of the notation this blog uses.

Hello, World

The reason for existence, a raison d'être.