Skip to content

PHP RabbitMQ Work Queue

View Project Source: https://github.com/jessecascio/jessesnet

Message queues, or job queues, allow web applications to control when and where tasks are performed. By distributing CPU or I/O intensive jobs over a server cluster, web applications can significantly improve performance, while maintaining a highly scalable infrastructure. It is essential to any message queue to have a reliable, flexible message server to handle message control, routing, etc. RabbitMQ is a robust message broker offering numerous configurations and options, and is proven to work well in high workload environments. For this project I will set up a simple PHP RabbitMQ work queue to demonstrate how a backend job queue could be architected and scaled.

The goal of this project was to build a small framework to demonstrate how to architect a basic PHP RabbitMQ job queue. The RabbitMQ website offers numerous tutorials in a variety of languages; however the PHP examples are primarly procedural code snippets. I wanted to demonstrate how those concepts can be applied to a modern, PHP object oriented code base. Some of the key concepts included:

Modern Layout - The key components of a well written PHP application are included: Bootstrapping, namespacing, dependency management via composer, autoloading, folder structure, unit testing, etc.

Dependency Injection - Most of the RabbitMQ classes are reused throughout the application. By introducing dependency injection, the code base is being decoupled from the underlying RabbitMQ classes.

Singleton Pattern - Like a database, a connection must be made to RabbitMQ. Using a singleton pattern, I am ensuring that only a single connection will be made at a time, while avoiding static functions to help with testing.

Factory Pattern - The job queue will need to handle various different types of jobs. A Worker Factory was used to produce workers for the appropriate job types. Used in conjunction with a XML config file, the classes that process the work are provided via a factory, defined in an XML file, requiring no code changes as new workers are added to the system.

Decorator Pattern - With the use of interfaces all the worker classes are defined as such, allowing for type casting in parameter calls as well as making the unit testing cleaner.

Unit Testing - Some simple unit tests were included to verify the functionality of the core classes and to demonstrate how unit testing can be used in testing the code base. Some of the tests required mocking of the RabbitMQ wrapper classes.

PHP Libraries - Common PHP libraries were used, such as Symfony's ContainerBuilder and Monolog, to aid in rapid development.

For this write up I will do an overview of the various code pieces written for the work queue. For a more in-depth overview of the various RabbitMQ classes and functions, check the PHP RabbitMQ Job Queue blog post.

Architectural Overview

The goal we are trying to solve is allowing for distributed processing of jobs over a cluster of servers, taking strain off web servers and allowing for more control over where and when intensive work is done. In a web application, there would be some process, user interaction, cron, etc. which would collect data that needed to be processed. Instead of having that data be processed immediately, it would be placed into a queue for future processing. There would be separate processes, referred to here as daemon processes, which monitor the queue and pull out the data as it is inserted. Theses processes could be on different servers, spreading the workload across a cluster. RabbitMQ is the message broker, where the data is saved and where the workers pull the data from. RabbitMQ has nothing to do with the processing or analysis of the data, it simply holds the data, messages, until they are consumed. There are numerous different queue architectures but for this example I am building a FIFO (first in first out) queue. This code lays a foundation which could be built upon to make more advanced queue structures.

Code Installation

Getting the code up and running only requires a couple of things. RabbitMQ must be installed and running, and having the management UI is helpful for monitoring the queue's status. Once the code is cloned from github, the vendor files will need to be installed via composer and the config file can be copied and updated if RabbitMQ needs a custom host/port or user.

cd /var/www
git clone git@github.com:jessecascio/jessesnet.git
cd jessesnet/portfolio/rabbitmq-php-basic
composer install

# for custom RabbitMQ vars, copy and update config
cp app/config/config.development.php app/config/config.php
Once set up, verify that the code can communicate with RabbitMQ
# from rabbitmq-php-basic/
app/bin/verify.php

Successfully connected to queue
0 worker processes running

Any error messages should be addressed before continuing. The likely cause is the port, host, or credentials for RabbitMQ which are defined in app/config/config.development.php.

Service Container

The service container is an instance of Symfony’s DependencyInjection\ContainerBuilder object, allowing for access to key classes throughout the application without having to instantiate them directly. The Service\Container class is where services are registered into the ContainerBuilder object and allows for retrieval of single services or the entire container object. The container allows for class reuse throughout the application without depending on direct instantiation, decoupling the class abstractions and concrete usages.

Broker Client

There is a composer library written to interact with the RabbitMQ broker which will be used in the code base:
"videlalvaro/php-amqplib": "2.2.*"

However, instead of instantiating the objects directly throughout the code, I use wrapper classes allowing for future library changes to require minimal code updates.

When working with RabbitMQ it is best to have a single connection to the broker, and have each process create, consume, and destroy their own channels to the broker. I used a combination of the dependency injector and a singleton design pattern to enforce this concept. By having a private constructor the RabbitClient can prevent direct instantiation.

src\Broker\RabbitClient.php

private function __construct()
{
  // create connections, constants defined via Bootstrap
  $this->AMQPConnection = new AMQPConnection(RABBIT_SERVER, RABBIT_PORT, RABBIT_USER, RABBIT_PWD);
  $this->AMQPChannel = $this->AMQPConnection->channel();
}

This was then incorporated into a dependency injection container, as the connection was needed throughout the application. By declaring factory methods for the class on the container and tracking the instance inside of RabbitClient, I ensure that the same instance will always be returned from the service container.

src\Service\Container.php

private function buildContainer()
{
  // rabbitmq client
  $this->ContainerBuilder->register('rabbit-client')
    ->setFactoryClass("Broker\RabbitClient")
    ->setFactoryMethod("getInstance");
}

This is tested in test\Broker\RabbitClientTest:

public function testSingleConnection()
{
  $RabbitClient1 = $this->DIContainer->get('rabbit-client');
  $RabbitClient2 = $this->DIContainer->get('rabbit-client');

  $this->assertTrue($RabbitClient1->getConnection() === $RabbitClient2->getConnection());
}

The RabbitMQ connection is required with all interactions with the queue and is now available throughout the application, while being decoupled from the underlying implementation details by using the dependency injector and wrapper classes, and ensuring a process can never create multiple connections to the queue.

Daemon Processes

The processes used to monitor the queue are referred to as daemons, as they are continually running scripts, which are run in the background to monitor the job queue and delegate the work as it becomes available. There is only a single daemon for this queue, located in Proc\Daemon, however there can be numerous processes running on a single machine. Using the dependency injector, the daemon class is able to get the abstracted classes to interact with RabbitMQ. Once the daemon connects to the queue via its channel, the process idly awaits for messages to be dropped into the message queue.

Workers

Once the daemon process pulls data from the queue it needs to know what to do with it. This logic is abstracted out into the concept of workers. The workers are the classes that actually process the data from the queue. When the daemon gets the data, it is passed to the RabbitHandler to delegate the work:

$AMQChannel->basic_consume(RabbitClient::DEFAULT_QUEUE, '', false, false, false, false, [$RabbitHandler, 'consume']);

The RabbitHandler is the object which determines which class should receive the data. By abstracting the message handler, I am able to encapsulate the worker retrieval and job processing outside of the daemon class, as the daemon's only role should be to monitor the queue. Also, having a handler class keeps the daemon code minimal and allows for different daemon types to reuse the handler logic. The RabbitHandler must be tested using mock objects, test\Broker\RabbitHandlerTest, as objects are passed into the handler and used internally. Depending on the job_type of the message, the handler is able to instantiate the correct class via the Proc\Factory class.

$Worker = Factory::get($body['job_type'], $body);
$Worker->run(); 

Using interface decorators on the worker objects, all workers are treated the same and know that they are able to handle processing the data from the queue. The Factory knows which worker class to return based on the job_type which is defined in the XML config file, app\config\proc\workers.xml. This allows new worker types to be added, with no code changes required. The Factory worker only loads the XML once, and keeps it in memory, therefore when new worker types are added the daemon processes would need to be restarted. This could be avoided by saving the config in memory and reloading periodically.

Once the jobs are processed, regardless if there are errors, the messages are removed from the queue via the acknowledge() function call. This prevents error prone jobs from clogging up the queue.

Supervisor

Whenever dealing with processes that are running in the background it is important to be able to control and access them. The Supervisor, Service\Supervisor, was created to create, check, and stop the daemon processes. To interact with the Supervisor there is a command line PHP script, app\bin\proc.php. To start the daemons, the Supervisor simply calls the exec() function to start the PHP processes in the background. There is logic built in to prevent the accidental creation of too many processes. To stop the process the Supervisor just kills the processes. In a production application there would have to be a more graceful way to stop the daemons to ensure they are not killed in the middle of processing a job.

To start interact with the daemons:

# check how many processed are running
app/bin/proc.php -c status

# start daemons
app/bin/proc.php -c start -p 5
app/bin/proc.php -c start -p 3
app/bin/proc.php -c status

# kill the processes
app/bin/proc.php -c kill

When their are daemon processes running this can be confirmed via the RabbitMQ management UI by the number of consumers and connections. The consumers are the daemons and should match the number of jobs running.

Running the Code

Before running the code it is beneficial to be able to log into the management UI, 127.0.0.1:15672. For simulating jobs and creating the daemons, use the script app\run.php. After creating the jobs, log into the management and verify the messages were placed into the queue and successfully removed. The daemon outputs status updates to logs\output.log. The script will kill any daemon processes running and clear the log before proceeding.

# create 100,000 jobs with 3 daemons processes
app/run.php -j 100000 -p 3

# create 50,000 jobs with 2 daemon processes
app/run.php -j 50000 -p 3

By watching the management UI you can see the jobs being populated and being processed by looking at the graphs for the job_queue queue. Also a log is kept in logs/output.log of the PIDs of the workers and what they are doing.

Conclusion

Using this code sample I was able to demonstrate how to setup a basic PHP RabbitMQ work queue. Although not intended to be used, the code examples should give enough guidance to build a production ready PHP work queue system. This code just outlines a basic queue, but will be used in future projects to outline more advance queue set ups. Some additional considerations:

How will this distribute - Currently starting workers is only done via a command line script. The RabbitMQ broker could be installed on a server and any number of other servers could be running numerous daemon processes. When setting up such a system, having to log into every server to manually start/stop processes would not be ideal. There needs to be some form of centralized control, for managing and monitoring daemons across servers.

More advanced queues - Incorporating things since as fanouts, advanced routing, RPC, etc., can make queues much more robust and adaptable to a variety of different use cases. Also things such as message persistence, job priority, etc. were not discussed in this overview, rather just implemented into the code. Before designing a queue, all those factors should be considered.

Alternative data store - While RabbitMQ is extremely robust, you may not want to be passing around large amounts of data over your network. For example, let’s say one of the queue’s primary jobs is large scale data analysis. There is no need to send all the data to the queue just to be pulled every time. Other database solutions such as Mongo, or MySQL, can be used in conjunction with RabbitMQ to offer an alternative data store. The queue could simply handle keys, commands, or queries, to what data should be accessed and what should be done with it.

One benefit of using RabbitMQ is that it is not language specific. Jobs can be entered by a PHP script, but processed by a Java or C++ script. This flexibility allows domain specific problems to be tackled by different tools. Considerations would have to be made on how to run daemons in different languages.