May 2, 2014

One of the major changes in Celery v3.1 is the way in which concurrent workers are managed. Celery workers are separate processes, so they must rely on interprocess communication channels (IPC) pipes to communicate. In the past, a single pipe was shared by all the workers to receive jobs to be processed from the master. In this new version, the master now establishes separate communication channels for each worker. One pipe is created for the master to send messages to the worker, and another pipe is to send back status updates to the master.

How does this new implementation work in practice? When Celery first gets instantiated, it creates separate SimpleQueue instances for each process it forks. One queue is used for receiving messages, while the output queue is used to send back responses.

concurrency/asynpool.py:

def create_process_queues(self):
        """Creates new in, out (and optionally syn) queues,
        returned as a tuple."""
        # NOTE: Pipes must be set O_NONBLOCK at creation time (the original
        # fd), otherwise it will not be possible to change the flags until
        # there is an actual reader/writer on the other side.
        inq = _SimpleQueue(wnonblock=True)
        outq = _SimpleQueue(rnonblock=True)

The file descriptors for these queues are used in Celery v3.1 to dispatch jobs to each worker process. This event loop cycles through the known file descriptors to see if there are any workers ready to receive new messages. If there is an available file descriptor, the master process will take the next available message and write it to the input queue of this worker.

def schedule_writes(ready_fds, shuffle=random.shuffle):
            # Schedule write operation to ready file descriptor.
            # The file descriptor is writeable, but that does not
            # mean the process is currently reading from the socket.
            # The socket is buffered so writeable simply means that
            # the buffer can accept at least 1 byte of data.
            shuffle(ready_fds)
            for ready_fd in ready_fds:
                if ready_fd in active_writes:
                    # already writing to this fd
                    continue

This event loop also attempts to maintain the same number of workers that were first initialized. If one worker has quit after the maximum number of times it can run (set via the MAX_TASKS_PER_CHILD parameter), it will attempt to spawn a new process to replace the previous one. The previously used file descriptors must be closed, and new ones opened.

Keeping track of which file descriptors need to be monitored can therefore be tricky. The operating system (OS) will reuse previously closed file descriptors, so an application that maintains the active list could fall out of sync very quickly. In Celery v3.1, these issues manifested themselves in assertion errors that often seemed to be triggered intermittently.

[2014-05-01 07:41:44,258: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError

The problem seemed to be happening after the maximum number of tasks for a process had been reached. In addition, it seemed to be related to what happens when an old worker had to be replaced by a new one. The process themselves wouldn't crash while running tasks, so these assertion errors was just more of a nuisance that triggered Nagios alerts and required manual restarts of Celery.

From the stack traces we collected, it was clear that something was wrong in Celery v3.1/s new asynchronous loop. When Celery instantiates new workers, it schedules about 4 seconds later to check whether a worker sent back a WORKER_UP message:

class Worker(_pool.Worker):
    """Pool worker process."""
    dead = False

    def on_loop_start(self, pid):
        # our version sends a WORKER_UP message when the process is ready
        # to accept work, this will tell the parent that the inqueue fd
        # is writable.
        self.outq.put((WORKER_UP, (pid, )))

If a WORKER_UP message is received by the master process, it will mark that this new worker is ready. Otherwise, it will send an error and attempt to kill this hung worker. The assertion errors seemed to be occuring during these checks.

To help simulate the behavior, I started adding delays to when the worker would send back its WORKER_UP message. I also attempted to increase the timeout from 4 to 10 seconds to help mimic the behavior of a machine under high CPU usage. By introducing timing delays, I intended to exaggerate the issue and trigger more erratic behaviors in the code. The section below highlights the changes done to Celery's asynchronous event loop.

#: A process must have started before this timeout (in secs.) expires.
-PROC_ALIVE_TIMEOUT = 4.0
+PROC_ALIVE_TIMEOUT = 10.0

 SCHED_STRATEGY_PREFETCH = 1
 SCHED_STRATEGY_FAIR = 4
 class Worker(_pool.Worker):
         # our version sends a WORKER_UP message when the process is ready
         # to accept work, this will tell the parent that the inqueue fd
         # is writable.
+        import time
+        time.sleep(8)
+        error("Worker up!")
         self.outq.put((WORKER_UP, (pid, )))


 class AsynPool(_pool.Pool):
         waiting_to_start = self._waiting_to_start

         def verify_process_alive(proc):
+            error("Verifying process alive: proc=%s waiting_to_start=%s" % (proc, waiting_to_start))
+

After stopping and starting the test program and using 8 concurrent workers, I was able to trigger the same assertion errors. Eventually I was able to pinpoint the issue to two possible situations. In one situation, the file descriptors were being recycled and reused by another worker. In the other case, the file descriptors previously used as the input queue of a worker were being used by a new worker as the output queue. In both cases, the fact that the file descriptors from an old worker were removed after the worker was being reclaimed for use by a new worker caused the issue.

I was able to find a fix by pinpointing how this sequence appears to get triggered and noticing that workers that reached the end of their lifetime were still considered to have jobs that were partially sent. When this situation happened, the process was marked dead but the file descriptors were not appropriately cleaned. As a result, it seems to make the asynchronous loop close the file descriptor later after it had been reused.

The pull request to fix this intermittent assertion errors is located here. It appears to solve the immediate problem, though certainly more robust ways of handling these possible race conditions are much more warranted. A much better long-term goal, especially when writing concurrent programs such as this one, may be to implement some type of reference count system. This way, file descriptors are removed from the list if and only if there are no other processes using them.



blog comments powered by Disqus