June 16, 2014

Over this past weekend, I was trying to figure out how to enable some of the RabbitMQ extensions in the AMQP standard. One of these features includes the ability to receive a connection-blocked status in the event that RabbitMQ decides to start throttling connections, which can happen because of memory pressure or a large influx of incoming messages being published to a queue. I was trying to understand among other issues why performance of certain processes seemed to slow down without any obvious hint or warning.

We use the librabbitmq module, which is a highly performant AMQP client written in C for the Celery project. The issue is that librabbitmq doesn't expose the ability to enable these extensions directly from Python. I wanted to figure out a way to pass in a hash structure similar to how the Ruby AMQP client Bunny provides a dictionary of extension properties:

DEFAULT_CLIENT_PROPERTIES = {
      :capabilities => {
        :publisher_confirms           => true,
        :consumer_cancel_notify       => true,
        :exchange_exchange_bindings   => true,
        :"basic.nack"                 => true,
        :"connection.blocked"         => true,
        # See http://www.rabbitmq.com/auth-notification.html
        :authentication_failure_close => true

How would one modify the existing librabbitmq source code to do the same except in Python? First, I had to understand how the Python C Extension API worked. Because segmentation faults and memory leaks are common when working in the C language, I realized that I needed to rebuild the Python source tree to trace down which line of code was contributing to the errors.

Since all the underlying methods for the librabbitmq library are implemented in C, the first thing I needed to do was to modify the connect() method of the Connection class to be able to accept arguments. Unless I made changes to the underlying C code, Python would not recognize this library as needing any parameters unless I changed this declaration. The change entailed modifying the METH_NOARGS to METH_VARS in connection.h:

static PyMethodDef PyRabbitMQ_ConnectionType_methods[] = {
    {"fileno", (PyCFunction)PyRabbitMQ_Connection_fileno,
        METH_NOARGS, "File descriptor number."},
    {"connect", (PyCFunction)PyRabbitMQ_Connection_connect,
        METH_VARARGS, "Establish connection to the broker."},
(If I wanted keyword arguments, I could also use the METH_KEYWORDS definition too. The declaration can be bitwise joined to accept both positional and keyword arguments.)

Once I allowed the method's signature to accept positional arguments, the next step was to be able to convert the arguments into a data type that could be used by the AMQP library. The PyArg_ParseTuple() function allowed me to specify how to extract the arguments provided and has different format options to use (i.e. converting a parameter into a native C integer). I ended up deciding to keep the argument as a Python Object data type since the AMQP library C code had a special function called PyDict_ToAMQTable that takes a PyObject type and converts it to a specific C data structure.

static PyObject*
PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self, PyObject *args)
{
    PyObject *client_properties;

    if (!PyArg_ParseTuple(args, "|O", &client_properties))
        goto bail;
(Note the pipe symbol (|). It specifies that the argument can be considered optional when invoking the connect() method.)

The next step was to take this PyObject type and convert it to a data structure for the AMQP client library. Specifically, I needed to figure out how to convert this PyObject to an amqp_table_t data structure, which is what the amqp_login_with_properties() function uses to send the correct wire format to the RabbitMQ server. The challenge was to figure out how to create this data structure and set the values accordingly.

Even though the librabbitmq module had this special function to handle the conversion of Python objects, one major issue that I encountered was that the function didn't handle Python boolean types, so the wrong wire format was initially being sent to the RabbitMQ broker. I came to this conclusion only after capturing the network traffic and comparing the responses from the broker against the output from the Ruby AMQP client. The changes to support converting boolean values are listed in this pull request.

There were other minor bug fixes I had to make to the librabbitmq module, such as preventing segmentation faults when using integer values. The full set of changes in librabbitmq are located in this pull request. There is still more work to be done in terms of making the AMQP extensions fully supported, so these changes are just the start. I hope that this writeup serves as a guide to those less familiar with how Python C extension modules work and motivates others to make similar improvements too!



blog comments powered by Disqus