Job Scheduling with Redis

One queue to rule them all

As a Managed Security Provider we currently operate more than 3000 hosts all around the globe. This only scales if repetitive tasks like activating new configuration or rolling out a new firmware on some or all of the hosts can be automated. Some tasks need to run immediately, like the activation of a change in a distributed firewall ruleset. Some tasks run regularly like DNSsec key rollover. And some, like the upgrade of the firmware, depend on customer approved service windows. They all are managed by our Job Scheduling System with distributed worker nodes which coordinate via a Redis database. Now lets look how this all fits together.

Why Redis?

Redis is an open source in-memory key-value database which also supports some more complex data structures. It is single threaded which is ideal for distributed locking and data exchange as this avoids concurrency problems. Still, it is fast and easy to program from probably all popular programming languages.

Let's Work!

The heart of the Job Scheduling System is the job queue, implemented as a sorted set in Redis. A sorted set stores values sorted according to a score value. So in the job queue we "score" the job id with the time (as UNIX epoch) when the job should be executed.

And now what is the job of the worker?

  • Is there any work?
    • Take a small number of job ids from the front of the job queue.
    • Go to sleep if the execution time (score) is in the future or there are no jobs.
  • Lock the job
    • Try to lock the job in Redis. The lock expires at a job individual timeout.
    • Check if the execution time of the job did not change in between. Else some other worker already worked on that job and put it back into the job queue.
    • Try the next job if any of above steps failed.
  • Work
    • In the job queue move the execution time of the job into the future according to the timeout.
    • Work on the job.
  • Cleanup
    • If all is done, remove the job from the job queue.
    • If future actions are required, update the execution time in the job queue.
    • Remove the lock on the job.
  • Any further work?

The timeout is important to recognize and clean up problems in the job execution process. The jobs are marked as in execution. If worker has a problem with the job execution (hangs, got killed, etc.) and does not to the bookkeeping as it should, the job still gets picked up later by another worker. It will recognize that this job should be already in execution and thus will remove it from the job queue and signal an error.

Scaling Up, Scaling Down

Scaling up is as easy as adding more workers. Here the contention is usually on other resources and not on the Redis server as the real job work usually takes much longer than managing the jobs as such.

But much of the time there is nothing to do. So the worker best should do nothing too. To avoid having many processes polling the job queue, we have a master process on all worker nodes which does the polling. If they manage to get a job to execute, they fork a child worker process doing the real work. If there are further jobs in the queue the master process spawns new working children until a configured limit is reached. If the child worker is finished with the job, it tries to get a new job from the job queue and will only exit if there is nothing left to do.

Fair Scheduling

Having that in place and working as intended, we ran into a usability problem. Imagine Alice wants to activate a change on all hosts and thus spawns some 3000 jobs to be run immediately. It may take an hour to work on this chunk. But that is OK for Alice, it is indeed a big task. But then comes Bob with a change for about three or four hosts. Because they are placed later in the job queue, they will wait for Alices' jobs to be finished so it will take more than an hour for the little task too. This is not really what Bob wanted or expected.

Somehow we should priorize Bobs' task. But any kind of priority will spoil the simple design of the Job Scheduling System. Still we might do some "priorization" by moving a job on the timeline in the job queue. So we might spread the jobs of the big task, then part of it runs later so there is room for small tasks in between. But then also big tasks should run as fast as possible we do not know ahead how much time they need or how many workers are available. So if there is no overload, there is no priorization needed. But in an overload situation, we may reorder the jobs which should have been run in the past. The exact value of desired execution time in the job queue is not important anymore: we are already too late. So we implemented a fair reordering which "gives" the same amount of worker share to each delayed task.

Fair Scheduling Reordering

The implementation is quite straight forward. The first worker which gets a job waiting already for too long grabs an "overload handling lock", so the other workers continue working hard on emptying the job queue. Then all pending jobs are iterated in reverse order. The first job met of each task are then placed in front of the queue, the following ones a bit later, but always ealier than the current time. The reordering stop as soon as a job is met which is being or has already been processed by another worker. Now Bob does not realize anymore that Alice is hammering the system.

Pingmachine

A highly efficient and easy to use probing framework

On our devices we do a lot of active monitoring with ping probes. We have more than 3000 linux hosts that use ping probes for:

  • Measuring the availability of our hosts, as seen from our infrastructure ("sang")
  • Measuring the performance of VPN tunnels ("tmon")
  • Measuring the availability of Internet links ("linkmon")
  • Detecting that a link is down for a link failover ("link-failover")
  • Choosing a download server for signature updates ("clientha")

Doing ping probes is not trivial, because you usually need to do the probing asynchronously. Instead of reimplementing this every time, we have developed a common component that can take care of executing ping probes and collecting the results. We called it Pingmachine.

It is heavily inspired by Smokeping by Tobi Oetiker, but designed to be used as a monitoring component that runs in the background (whereas Smokeping is a complete application).

The basic idea is that any monitoring component that needs some ping probes, just needs to define "ping orders" by writing YAML-formatted files, and wait for results for that order to appear under that order. It's a very simple API that simplified this part of our monitoring applications greatly.

Having a single component taking care of the probing, also makes sure that we can better control what probing is being done. Also, it makes probing issues easier to debug. There is a "pingmachine-status" command-line program that reports all ping orders that are currently active, along with their results.

Most of the probing is executed by fping, whose maintenance is also sponsored by Open Systems. Other probes can be easily defined to implement measurements not based on ICMP ECHO, but on anything else (SSH connections, for example, or HTTP requests).

Pingmachine is self-contained and doesn't depend on any other software written by us. We have published it on Github: https://github.com/open-ch/pingmachine/