Programming a Semi Autonomous Blimp with Python

Hearsay Social will again be hosting the San Francisco Django Meetup on Wednesday, May 28th starting at 6:00 PM. Click here to RSVP or learn more. The talk was also presented at PyCon 2014, but was constrained to only 20 minutes.

A whole series of blog posts document the work as it progressed starting on March 15th.

Join us to reminisce over early 20th century technology, drink beer, talk code, and possibly inhale helium.

Experiences with Celery v3.1

Since we've upgraded with Celery v3.1, we've had numerous issues with this latest iteration. Here are a few things we thought we'd share a few snags we discovered:

  1. If you're running the Ubuntu v12.04 or Debian Squeeze, consider installing the uuidd daemon. Celery relies on the libuuid library for task ID generation, and Celerybeat can fail to dispatch tasks in daemon mode because of stale file descriptor references to /dev/urandom. There have been numerous fixes to help mitigate this issue, but if you continue to have issues with tasks not being dispatched, consider simply installing the uuidd daemon to run on any machine that uses Celery. (For more information, see this previous post.)

  2. If you're using RabbitMQ and have upgraded to v3.3.0, you must upgrade to Celery v3.1.11. If you're using librabbitmq, you should also upgraded to the latest v1.5.1 version. Celery workers are configured to process only a certain number of messages at a time using the prefetch count. Since scheduled tasks created with countdown/eta parameters are held in memory and left unacknowledged by the workers until they can be executed, Celery needs to increase this prefetch count.

    In RabbitMQ v3.3.0, changes were made in how this prefetch count is managed. Without an upgrade to Celery v3.1.11, requests to increase this number would be applied to new connections and not to existing ones. As a result, Celery would stop processing any further tasks once the prefetch number was reached, thereby causing delays in tasks dispatching. (For more information, see the reported issue on GitHub. and the RabbitMQ release notes.)

  3. The latest version of librabbitmq now appears to support connection pooling, so you must be especially careful when using Celery with concurrent workers. Upon forking, processes will inherit existing connections from their parent process, so care must be taken to close any open connections. For instance, if you issue any type of control commands before the process is forked, you must make sure the other processes do not end up reusing the connection.

    The reason is that AMQP is dependent on the sequence of frames being sent. Multiple processes sharing the same connection will interleave packets, triggering "UNEXPECTED_FRAME - expected content header for class 60" from the message broker. It can also cause workers to stall expecting a response from the AMQP broker.

    One example of how to fix this issue is to close the AMQP connection using the worker_init signal provided by Celery:

    from celery import current_app
    from celery.signals import worker_init
    
    def cleanup_connections(*args, **kwargs):
        current_app.close()   # try to avoid reusing the same AMQP connection pools across workers
    
    worker_init.connect(cleanup_connections)

    For more background information, see this discussion thread and this posting about fork safety.

  4. If you are using the MAX_TASKS_PER_CHILD parameter to help avoid memory fragmentation issues that can occur with long-lived Python processes, be wary of issues with workers crashing intermittently when this maximum number of tasks has been reached. Celery v3.1 implements a new asynchronous event loop which relies on the main process to receive messages and dispatch the tasks to other workers via interprocess pipes. However, there are issues with the way in which file descriptors are being monitored in this event loop, causing AssertionErrors to be triggered and workers to stop receiving msesages.

    We have contributed a number of fixes to help resolve this issue, most notably improvements to the test suite to help catch this issue. The main fix has already been merged, so we are awaiting a new version to help resolve these intermittent crash issues. We're also hopeful it will solve the problem with orphaned workers not waiting for new messages to process. (For more context, see our previous blog post.)

If you have any other experiences with Celery v3.1, we'd love to hear from you!

Celery v3.1 and Asynchronous Event Loops

One of the major changes in Celery v3.1 is the way in which concurrent workers are managed. Celery workers are separate processes, so they must rely on interprocess communication channels (IPC) pipes to communicate. In the past, a single pipe was shared by all the workers to receive jobs to be processed from the master. In this new version, the master now establishes separate communication channels for each worker. One pipe is created for the master to send messages to the worker, and another pipe is to send back status updates to the master.

How does this new implementation work in practice? When Celery first gets instantiated, it creates separate SimpleQueue instances for each process it forks. One queue is used for receiving messages, while the output queue is used to send back responses.

concurrency/asynpool.py:

def create_process_queues(self):
        """Creates new in, out (and optionally syn) queues,
        returned as a tuple."""
        # NOTE: Pipes must be set O_NONBLOCK at creation time (the original
        # fd), otherwise it will not be possible to change the flags until
        # there is an actual reader/writer on the other side.
        inq = _SimpleQueue(wnonblock=True)
        outq = _SimpleQueue(rnonblock=True)

The file descriptors for these queues are used in Celery v3.1 to dispatch jobs to each worker process. This event loop cycles through the known file descriptors to see if there are any workers ready to receive new messages. If there is an available file descriptor, the master process will take the next available message and write it to the input queue of this worker.

def schedule_writes(ready_fds, shuffle=random.shuffle):
            # Schedule write operation to ready file descriptor.
            # The file descriptor is writeable, but that does not
            # mean the process is currently reading from the socket.
            # The socket is buffered so writeable simply means that
            # the buffer can accept at least 1 byte of data.
            shuffle(ready_fds)
            for ready_fd in ready_fds:
                if ready_fd in active_writes:
                    # already writing to this fd
                    continue

This event loop also attempts to maintain the same number of workers that were first initialized. If one worker has quit after the maximum number of times it can run (set via the MAX_TASKS_PER_CHILD parameter), it will attempt to spawn a new process to replace the previous one. The previously used file descriptors must be closed, and new ones opened.

Keeping track of which file descriptors need to be monitored can therefore be tricky. The operating system (OS) will reuse previously closed file descriptors, so an application that maintains the active list could fall out of sync very quickly. In Celery v3.1, these issues manifested themselves in assertion errors that often seemed to be triggered intermittently.

[2014-05-01 07:41:44,258: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError

The problem seemed to be happening after the maximum number of tasks for a process had been reached. In addition, it seemed to be related to what happens when an old worker had to be replaced by a new one. The process themselves wouldn't crash while running tasks, so these assertion errors was just more of a nuisance that triggered Nagios alerts and required manual restarts of Celery.

From the stack traces we collected, it was clear that something was wrong in Celery v3.1/s new asynchronous loop. When Celery instantiates new workers, it schedules about 4 seconds later to check whether a worker sent back a WORKER_UP message:

class Worker(_pool.Worker):
    """Pool worker process."""
    dead = False

    def on_loop_start(self, pid):
        # our version sends a WORKER_UP message when the process is ready
        # to accept work, this will tell the parent that the inqueue fd
        # is writable.
        self.outq.put((WORKER_UP, (pid, )))

If a WORKER_UP message is received by the master process, it will mark that this new worker is ready. Otherwise, it will send an error and attempt to kill this hung worker. The assertion errors seemed to be occuring during these checks.

To help simulate the behavior, I started adding delays to when the worker would send back its WORKER_UP message. I also attempted to increase the timeout from 4 to 10 seconds to help mimic the behavior of a machine under high CPU usage. By introducing timing delays, I intended to exaggerate the issue and trigger more erratic behaviors in the code. The section below highlights the changes done to Celery's asynchronous event loop.

#: A process must have started before this timeout (in secs.) expires.
-PROC_ALIVE_TIMEOUT = 4.0
+PROC_ALIVE_TIMEOUT = 10.0

 SCHED_STRATEGY_PREFETCH = 1
 SCHED_STRATEGY_FAIR = 4
 class Worker(_pool.Worker):
         # our version sends a WORKER_UP message when the process is ready
         # to accept work, this will tell the parent that the inqueue fd
         # is writable.
+        import time
+        time.sleep(8)
+        error("Worker up!")
         self.outq.put((WORKER_UP, (pid, )))


 class AsynPool(_pool.Pool):
         waiting_to_start = self._waiting_to_start

         def verify_process_alive(proc):
+            error("Verifying process alive: proc=%s waiting_to_start=%s" % (proc, waiting_to_start))
+

After stopping and starting the test program and using 8 concurrent workers, I was able to trigger the same assertion errors. Eventually I was able to pinpoint the issue to two possible situations. In one situation, the file descriptors were being recycled and reused by another worker. In the other case, the file descriptors previously used as the input queue of a worker were being used by a new worker as the output queue. In both cases, the fact that the file descriptors from an old worker were removed after the worker was being reclaimed for use by a new worker caused the issue.

I was able to find a fix by pinpointing how this sequence appears to get triggered and noticing that workers that reached the end of their lifetime were still considered to have jobs that were partially sent. When this situation happened, the process was marked dead but the file descriptors were not appropriately cleaned. As a result, it seems to make the asynchronous loop close the file descriptor later after it had been reused.

The pull request to fix this intermittent assertion errors is located here. It appears to solve the immediate problem, though certainly more robust ways of handling these possible race conditions are much more warranted. A much better long-term goal, especially when writing concurrent programs such as this one, may be to implement some type of reference count system. This way, file descriptors are removed from the list if and only if there are no other processes using them.