One of the major changes in Celery
v3.1 is the way in which concurrent workers are managed. Celery
workers are separate processes, so they must rely on interprocess communication
channels (IPC) pipes to communicate. In the past, a single pipe was shared by
all the workers to receive jobs to be processed from the master. In this new
version, the master now establishes separate communication channels for each
worker. One pipe is created for the master to send messages to the worker,
and another pipe is to send back status updates to the master.
How does this new implementation work in practice? When Celery first gets
instantiated, it creates separate SimpleQueue
instances for each process it forks. One queue is used for receiving messages, while the output queue is used to
send back responses.
concurrency/asynpool.py:
def create_process_queues(self):
"""Creates new in, out (and optionally syn) queues,
returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it will not be possible to change the flags until
# there is an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
The file descriptors for these queues are used in Celery v3.1 to dispatch
jobs to each worker process. This event loop cycles through the known file
descriptors to see if there are any workers ready to receive new messages. If
there is an available file descriptor, the master process will take the next
available message and write it to the input queue of this worker.
def schedule_writes(ready_fds, shuffle=random.shuffle):
# Schedule write operation to ready file descriptor.
# The file descriptor is writeable, but that does not
# mean the process is currently reading from the socket.
# The socket is buffered so writeable simply means that
# the buffer can accept at least 1 byte of data.
shuffle(ready_fds)
for ready_fd in ready_fds:
if ready_fd in active_writes:
# already writing to this fd
continue
This event loop also attempts to maintain the same number of workers that
were first initialized. If one worker has quit after the maximum number of
times it can run (set via the MAX_TASKS_PER_CHILD
parameter), it will attempt to spawn a new process to replace the previous
one. The previously used file descriptors must be closed, and new ones opened.
Keeping track of which file descriptors need to be monitored can therefore be
tricky. The operating system (OS) will reuse previously closed file
descriptors, so an application that maintains the active list could fall out of
sync very quickly. In Celery v3.1, these issues manifested themselves in assertion errors that
often seemed to be triggered intermittently.
[2014-05-01 07:41:44,258: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
self.blueprint.start(self)
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
step.start(parent)
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
return self.obj.start()
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
blueprint.start(self)
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
step.start(parent)
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
c.loop(*c.loop_args())
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
next(loop)
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 274, in create_loop
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
entry()
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
return self.fun(*self.args, **self.kwargs)
File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
assert proc.outqR_fd in hub.readers
AssertionError
The problem seemed to be happening after the maximum number of tasks for a
process had been reached. In addition, it seemed to be related to what happens
when an old worker had to be replaced by a new one. The process themselves
wouldn't crash while running tasks, so these assertion errors was just more of a
nuisance that triggered Nagios alerts and required manual restarts of
Celery.
From the stack traces we collected, it was clear that something was wrong in
Celery v3.1/s new asynchronous loop. When Celery instantiates new workers,
it schedules about 4 seconds later to check whether a worker sent back a WORKER_UP
message:
class Worker(_pool.Worker):
"""Pool worker process."""
dead = False
def on_loop_start(self, pid):
# our version sends a WORKER_UP message when the process is ready
# to accept work, this will tell the parent that the inqueue fd
# is writable.
self.outq.put((WORKER_UP, (pid, )))
If a WORKER_UP message is received by the master process, it will mark that
this new worker is ready. Otherwise, it will send an error and attempt to kill
this hung worker. The assertion errors seemed to be occuring during these
checks.
To help simulate the behavior, I started adding delays to when the worker
would send back its WORKER_UP message. I also attempted to increase the timeout
from 4 to 10 seconds to help mimic the behavior of a machine under high CPU
usage. By introducing timing delays, I intended to exaggerate the issue and
trigger more erratic behaviors in the code. The section below highlights the
changes done to Celery's
asynchronous event loop.
#: A process must have started before this timeout (in secs.) expires.
-PROC_ALIVE_TIMEOUT = 4.0
+PROC_ALIVE_TIMEOUT = 10.0
SCHED_STRATEGY_PREFETCH = 1
SCHED_STRATEGY_FAIR = 4
class Worker(_pool.Worker):
# our version sends a WORKER_UP message when the process is ready
# to accept work, this will tell the parent that the inqueue fd
# is writable.
+ import time
+ time.sleep(8)
+ error("Worker up!")
self.outq.put((WORKER_UP, (pid, )))
class AsynPool(_pool.Pool):
waiting_to_start = self._waiting_to_start
def verify_process_alive(proc):
+ error("Verifying process alive: proc=%s waiting_to_start=%s" % (proc, waiting_to_start))
+
After stopping and starting the test
program and using 8 concurrent workers, I was able to trigger the same
assertion errors. Eventually I was able to pinpoint the issue to two possible
situations. In one situation, the file descriptors were being recycled and
reused by another worker. In the other case, the file descriptors previously
used as the input queue of a worker were being used by a new worker as the
output queue. In both cases, the fact that the file descriptors from an old
worker were removed after the worker was being reclaimed for use by a new worker
caused the issue.
I was able to find a fix by pinpointing how this sequence appears to get
triggered and noticing that workers that reached the end of their lifetime were
still considered to have jobs that were partially sent. When this situation
happened, the process was marked dead but the file descriptors were not appropriately
cleaned. As a result, it seems to make the asynchronous loop close the file descriptor
later after it had been reused.
The pull request to fix this intermittent assertion errors is located here. It appears to solve
the immediate problem, though certainly more robust ways of handling these
possible race conditions are much more warranted. A much better long-term goal,
especially when writing concurrent programs such as this one, may be to
implement some type of reference count system. This way, file descriptors are
removed from the list if and only if there are no other processes using them.