Thread carefully

November 13th, 2015 by Maxime Fabre

As far as I can remember, PHP has always had a terrible reputation at handling very heavy (or asynchronous) tasks. For a long while if you wanted to parallelize long tasks you had to resort to forking through pcntl_fork which had its own issues, and you couldn’t really handle the results of those tasks properly, etc.

As such, a habit has kind of developed where we go straight for more intricate solutions such as queuing (which just delays your task if anything), React PHP, or even using another language altogether. But PHP can do threading, and more importantly it’s a lot easier than you probably think.

In this article I’m going to dive into the pthreads extension (short for POSIX Threads). It has been around for a while (since 2012) but I feel like too many people forget it exists or assume it is going to be painful to use – mostly because the official documentation is rather slim about it.

For those not familiar with threading, the author of pthreads has a very nice summary of it which I’m just going to quote here:

Threading is about dividing your instructions into units of execution, and distributing those units among your processors and/or cores in such a way as to maximize the throughput of your application.

Now threading does not fit all use cases, you can’t just parallelize all the things and expect it to be faster, PHP or not. It is reserved for specific cases, but when in these situations, you’ll now be able to do it in PHP, with your existing domain logic.

Getting to know pthreads

Setup

First of all we need to make sure we have the extension. If you’re sailing the Homebrew version of PHP, nothing simpler, you install it like any other extension:

brew install php56-pthreads

One small note: pthreads is compiled with PHP as it requires the --with-thread-safety flag. As such once it’s done installing you will effectively be running a different version of PHP called the ZTS (Zend Thread Safety) version. Most extensions shouldn’t care but if you do get warnings like this when running php --version:

Error something: mcrypt was compiled with a different version of PHP

Simply reinstall the extension, per example brew reinstall php56-mcrypt, and you should be good to go. If you have the Blackfire extension enabled I also recommend disabling it for now as ZTS support is still experimental and you might encounter some issues.

Nomenclature

The pthreads extension has a lot of different concepts that aren’t necessarily explained very well and that can prove to be confusing at first. All you need to know is there are three types of classes provided by the extension:

  • Threaded objects are tasks that can be threaded, they’re your jobs, what you want to execute asynchronously.
  • Workers are the classes responsible for running those jobs and synchronizing their results.
  • Pools are responsible for managing multiple workers and dispatching jobs between them.

And that’s all. Now, the extension provides a handful of classes, but they simply derive from those:

As you can see that in the diagram, at its core every class is a child of Threaded, it’s the base class of the extension. Nonetheless you will rarely use it by itself as its children provide more convenient methods and are generally speaking easier to deal with.

A simple asynchronous example

Note: for brevity’s sake I won’t declare properties on the classes in the examples nor add docblocks, but you totally should. If you don’t, I will find you, and I will kill you.

Let’s start off easy with a simple web crawling example. We’ll start by creating our job, a class extending Thread which itself extends Threaded. All classes extending Threaded must have a run method defining what should be the task to thread, so in our case it’ll be a simple file_get_contents of a Google search, to keep it simple:

<?php
class SearchGoogle extends Thread
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        $this->html = file_get_contents('http://google.fr?q='.$this->query);
    }
}

Let’s try creating an instance of this, to per example, search cats. To start the job, we call the start method on it. It won’t return anything, but the job will start in a separate thread.

<?php
$job = new SearchGoogle('cats');
$job->start();

Now once a job is started, there are several methods you can call to check on it, such as $job->isRunning(), but the one we want is the join method which will make the parent thread wait for the task to be finished and “join it back” into the main thread. Basically this is what is happening:

Once join is called, we can be sure the class is holding our results:

<?php
$job = new SearchGoogle('cats');
$job->start();

// Wait for the job to be finished and print results
$job->join();
echo $job->html;

With that in mind, we can now per example start multiple searches at the same time and get the results:

<?php
$searches = ['cats', 'dogs', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $search->start();
}

foreach ($searches as $search) {
    $search->join();
    echo substr($search->html, 0, 20);
}

If we print out a timestamp within our job class:

public function run()
{
    echo microtime(true).PHP_EOL;

    $this->html = file_get_contents('http://google.fr?q='.$this->query);
}

And run our file, we’ll see that all three jobs indeed started at the same time:

$ php multiple.php
1446987102.4479
1446987102.4503
1446987102.4525

<!doctype html><html
<!doctype html><html
<!doctype html><html

Managing jobs with workers

Now this was relatively easy, but ideally you don’t want to have to manage your jobs yourself, ie. you don’t want to have to start each of them individually and then join them one by one. You might want to just, throw your jobs somewhere and let them do their thing, and then get their results when all of them are done. That’s where workers come into play.

Workers are a class on top of which you can stack jobs, to start and join all of them at once:

<?php
class Searcher extends Worker
{
    public function run()
    {
        echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
    }
}

// Stack our jobs on our worker
$worker   = new Searcher();
$searches = ['dogs', 'cats', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $worker->stack($search);
}

// Start all jobs
$worker->start();

// Join all jobs and close worker
$worker->shutdown();

This would print out something like this:

Running 3 jobs
1446989108.5938
1446989108.6898
1446989108.9086

Gathering the results

Now in order for our worker to keep track of the results of each of its job, we can simply add a method on it, and call it from the jobs. When you stack a job onto a worker, said job will then be aware of the worker and will be able to access it through $this->worker. So let’s do just that, and gather the HTML fetched by our jobs:

<?php
class Searcher extends Worker
{
    public $data = [];

    public function run()
    {
        echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
    }

    /**
     * To avoid corrupting the array
     * we use array_merge here instead of just
     * $this->data[] = $html
     */
    public function addData($data)
    {
        $this->data = array_merge($this->data, [$data]);
    }
}

class SearchGoogle extends Threaded
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        echo microtime(true).PHP_EOL;

        $this->worker->addData(
            file_get_contents('http://google.fr?q='.$this->query)
        );
    }
}

// Stack our jobs on our worker
$worker   = new Searcher();
$searches = ['dogs', 'cats', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $worker->stack($search);
}

// Start all jobs
$worker->start();

// Join all jobs and close worker
$worker->shutdown();
foreach ($worker->data as $html) {
    echo substr($html, 0, 20).PHP_EOL;
}

If we run this we’ll get the proper result:

Running 3 jobs
1446989506.0509
1446989507.938
1446989510.4684
<!doctype html><html
<!doctype html><html
<!doctype html><html

Now, you may notice I’m not doing $worker->stack(new SearchGoogle($search)). This is because the worker has to keep track of references to its jobs, ie. when you start the worker, all the jobs you stacked onto it have to still reference to something in the main thread. As this is rather cumbersome, a class was created for this, called the Pool class. Let’s look into it:

Pooling jobs

A Pool is a class whose purpose is to dispatch jobs onto one or more workers, and manage those jobs. It is described as such in the documentation:

Pooling provides a higher level abstraction of the Worker functionality, including the management of references in the way required by pthreads.

Let’s rewrite our previous example with pooling. When you create a pool you need to pass it three arguments:

  • The number of workers the pool should be able to use at the same time
  • The kind of worker the pool should use (a class name string)
  • Facultative arguments to pass the workers when creating them

In our case we’ll use the native pool and worker class so our new instance will look like this:

<?php
$pool = new Pool(5, Worker::class);

We can then submit jobs to the pool by calling $pool->submit(<Threaded>). Main difference with workers is that a job starts as soon as you submit it to the pool, and you don’t have to handle references anymore:

<?php
// Create a pool and submit jobs to it
$pool = new Pool(5, Worker::class);
$pool->submit(new SearchGoogle('cats'));
$pool->submit(new SearchGoogle('dogs'));
$pool->submit(new SearchGoogle('birds'));

// Close the pool once done
$pool->shutdown();

If we do this, same as before, we’ll see all our jobs start at the same time. Everything works fine. Now let’s gather the results. In order to interact with the jobs in a pool without having to loop through workers and such, the pool has a convenient collect methods which acts as a filter of sorts. You’ll pass it a closure in which you’ll return whether a job should stay in the pool or not. Usually you return whether the job is done.

For this, pthreads provides a Collectable class which extends Threaded and which has two additional methods: setGarbage and isGarbage to mark a job as done and ready to be collected by the garbage collector. Let’s write our own Pool class that gathers the results from our jobs and then discards them:

<?php
class SearchPool extends Pool
{
    public $data = [];

    public function process()
    {
        // Run this loop as long as we have
        // jobs in the pool
        while (count($this->work)) {
            $this->collect(function (SearchGoogle $job) {
                // If a job was marked as done
                // collect its results
                if ($job->isGarbage()) {
                    $this->data[$job->query] = $job->html;
                }

                return $job->isGarbage();
            });
        }

        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();

        return $this->data;
    }
}

That also means we need to edit our SearchGoogle job to extend Collectable and call setGarbage at the end of the run method:

<?php
class SearchGoogle extends Collectable
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        echo microtime(true).PHP_EOL;

        $this->html = file_get_contents('http://google.fr?q='.$this->query);
        $this->setGarbage();
    }
}

We can now do the following:

<?php
// Create a pool and submit jobs to it
$pool = new SearchPool(5, Worker::class);
$pool->submit(new SearchGoogle('cats'));
$pool->submit(new SearchGoogle('dogs'));
$pool->submit(new SearchGoogle('birds'));
$pool->submit(new SearchGoogle('planes'));
$pool->submit(new SearchGoogle('cars'));

$data = $pool->process();
var_dump($data);

If we run this, not only will we see that all our 5 jobs were executed at the same time, but they completed at the same time and we were able to yield their results easily:

$ time php pooling.php
1446990493.7183
1446990493.7205
1446990493.7227
1446990493.7247
1446990493.7266
array(5) {
  'birds' => (53230) "<!doctype html>"...
  'cats' => (53211) "<!doctype html>"...
  'cars' => (53250) "<!doctype html>"...
  'dogs' => (53244) "<!doctype html>"...
  'planes' => (53267) "<!doctype html>"...
}
php pooling.php  0.51s user 0.03s system 100% cpu 0.940 total

Autoloading and context inheritance

All these examples were rather basic, and when you start delving into more advanced examples you’ll most likely hit one common speed-bump: child threads don’t inherit classes autoloaded in their parent’s context.

To understand this, you have to grasp that child threads don’t necessarily inherit their parent’s context, in some contexts it might even be inconvenient when your job just has to do one small stand-alone task and you don’t want it to load all of the shit happening upstairs in memory.

For this you can pass options to the ->start() method of Thread instances, which limits how much is passed from their parent:

<?php
define('MY_CONSTANT', true);
function test() {}
class Foobar {}

class Example extends Thread
{
    public function run()
    {
        var_dump(defined('MY_CONSTANT'));
        var_dump(function_exists('test'));
        var_dump(class_exists('Foobar'));
    }
}

// true true true
$job = new Example();
$job->start(); // default argument is PTHREADS_INHERIT_ALL
$job->join();

// false false true
$job = new Example();
$job->start(PTHREADS_INHERIT_CLASSES);
$job->join();

// true true true
$job = new Example();
$job->start(PTHREADS_INHERIT_CLASSES | PTHREADS_INHERIT_CONSTANTS | PTHREADS_INHERIT_FUNCTIONS);
$job->join();

Now let’s require something through Composer, per example symfony/var-dumper which is a nicer-looking wrapper around var_dump. And let’s try to use the provided dump() function in a job:

<?php
require 'vendor/autoload.php';

class Example extends Thread
{
    public function run()
    {
        dump('foobar');
    }
}

$job = new Example();
$job->start();
$job->join();

If you do this, it’ll fail and throw the following Exception:

Class ‘Symfony\Component\VarDumper\VarDumper’ not found

Because while that class was loaded in the main thread, it wasn’t in the child thread. Now there are multiple ways to circumvent this, the easiest one of course is to just require 'vendor/autoload.php' in the run of your Job but, that is pretty disgusting.

A nicer way to go about it, is to create a Worker class dedicated to setting up autoloading before running its jobs. Same idea, but now you don’t have to require anything in any of your job classes:

<?php
class AutoloadingWorker extends Worker
{
    public function run()
    {
        require 'vendor/autoload.php';
    }
}

// Create our worker and stack our job on it
$worker = new AutoloadingWorker();

$job = new Example();
$worker->stack($job);

$worker->start();
$worker->join();

// Or use a pool and specify our custom worker
$pool = new Pool(5, AutoloadingWorker::class);
$pool->submit(new Example());
$pool->shutdown();

Which would then yield the correct result:

$ php autoloading.php                                                                                                                                                                             "foobar"
"foobar"

The case of closures

This far we’ve been dealing with classes everywhere but closures can also be a good source of headaches if you’re not aware that context is not fully preserved. Per example here is some code that you’ll probably think should work:

<?php
class ClosureRunner extends Collectable
{
    public function __construct($closure)
    {
        $this->closure = $closure;
    }

    public function run()
    {
        $closure = $this->closure;
        $closure();

        $this->setGarbage();
    }
}

$foo = 'test';

$pool = new Pool(5, Worker::class);
$pool->submit(new ClosureRunner(function () use ($foo) {
    var_dump($foo);
}));

$pool->shutdown();

But it won’t, because closures are compiled in the main thread with a reference to the variable there, and then passed to your job, which tries to run it in the child thread, where the reference won’t point to an existing variable. It all boils down to PHP’s difficulties to serialize closures.

That being said, there does exist a convenience method to create jobs from closures with proper serialization, through the Threaded::from method (which means it’s available in all its children). Per example, the following would work:

<?php
$pool = new Pool(5, Worker::class);

$foo = 'test';
$pool->submit(Collectable::from(function () use ($foo) {
    var_dump($foo);
    $this->setGarbage();
}));

$pool->shutdown();

Synchronization and notifications

So far we haven’t shown great interest into what our child threads were doing, but a situation may arise where you need to coordinate what is happening in your main thread with what is happening in your child thread.

This is done through the wait and notify methods, and their related helpers such as isWaiting. Say per example we have something to do in the parent thread and some other task to do in parallel in a child thread. But we need the results from the child thread to complete our main task, in the parent thread, we would do it like so:

<?php
class Job extends Thread
{
    public function run()
    {
        while (!$this->isWaiting()) {
            // Do some work here as long
            // as we're not waiting for parent
        }

        $this->synchronized(function () {
            $this->result = 'DONE';
            $this->notify();
        });
    }
}

$job = new Job();
$job->start();

// Do some operation in the main thread here
sleep(1);

// Notify the child thread that we need it
// and get its results
$job->synchronized(function ($job) {
    $job->wait();
    echo $job->result;
}, $job);

Note how we wrap every call to wait and notify in a $job->synchronized() call, this is to ensure everything is synchronized when we make these calls and that both parties are ready.

So, we do our work in the main thread, then synchronize with the job and wait for a notification, which it then sends. This is different from simply doing $job->join(), as your child thread won’t be merged back into the main thread, it will continue to live its life and you can do things after your synchronized call. To call back to our previous diagram, here’s what a notify/wait routine would look like:

This might not seem very useful at first but it can allow a lot of things. Per example we can create a very stripped down equivalent to promises through this, by creating a Promise class that waits for its closure to be completed in a child thread to then report it to the main thread:

<?php
class Promise extends Thread
{
    public function __construct(Closure $closure)
    {
        $this->closure = $closure;
        $this->start();
    }

    public function run()
    {
        $this->synchronized(function () {
            $closure = $this->closure;

            $this->result = $closure();
            $this->notify();
        });
    }

    public function then(callable $callback)
    {
        return $this->synchronized(function () use ($callback) {
            if (!$this->result) {
                $this->wait();
            }

            $callback($this->result);
        });
    }
}

$promise = new Promise(function () {
    return file_get_contents('http://google.fr');
});

$promise->then(function ($results) {
   echo $results;
});

This also highlights something essential: waiting and notifications don’t have to be in different threads. You can make a child thread wait on itself, or on another child thread, or you can make the main thread notify all child threads, and so on.

Bonus round: threading a command bus

All the examples we’ve seen previously were very basic for the sake of comprehension. But the tasks you’d ideally want to thread are most likely much bigger than that, have more complex dependencies, and so on.

A common case for modern PHP projects is to be backed by a command bus (Laravel’s, Tactician, whatever rows your boat). Threading can easily be integrated into this context through a custom Worker class which would receive the command bus instance and execute the command, like so:

<?php
use League\Tactician\CommandBus;

class CommandBusWorker extends Worker
{
    public function __construct(CommandBus $bus)
    {
        $this->bus = $bus;
    }

    public function run()
    {
        require 'vendor/autoload.php';
    }

    public function handle($command)
    {
        return $this->bus->handle($command);
    }
}

class CommandJob extends Collectable
{
    public function __construct($command)
    {
        $this->command = $command;
    }

    public function run()
    {
        $this->worker->handle($this->command);
    }
}

$pool = new Pool(5, CommandBusWorker::class, [new CommandBus()]);

$pool->submit(new CommandJob(new Command($some, $arguments)));
$pool->submit(new CommandJob(new AnotherCommand($some, $arguments)));

And there you have it, your two commands would be threaded and would run in parallel, with all the benefits of the command bus on top of it. You could even create a special pool that would eliminate all the boilerplate:

<?php
class CommandBusPool extends Pool
{
    public function __construct(CommandBus $bus)
    {
        parent::__construct(5, CommandBusWorker::class, [$bus]);
    }

    public function submit($command)
    {
        return parent::submit(new CommandJob($command));        
    }
}

$pool = new CommandBusPool(new CommandBus());
$pool->submit(new Command($some, $argument));

One thing to watch out for though: any dependencies passed to a worker like we’re doing here needs to be serializable if it is not a child of Threaded (or a scalar). That means they must not have any properties containing closures. If they do, the simplest way is to recreate the instances in the child thread (through a container and service providers per example), or any other technique to resolve the command bus instance within the child thread.

On resources, and other pitfalls

Resources

The command bus example highlights something which you should absolutely watch out for: resources can not be passed from the main thread to child threads. This is because of limitations in the inner workings of PHP, and while it is being worked on and will work in some edge cases, it is recommended to steer clear from passing resources around between threads.

What do I mean by resources? I mean sockets, database connections, streams and so on. If you need your database in a child thread, create a worker that connects to your database in its own context. If you need a socket, same thing, connect to the socket within the child thread instead of in the main thread.

There are ways to circumvent this but you’ll save yourself major headaches by not trying to. The more you think of your child threads as a completely different context the better you’ll come off.

Data corruption

Another pitfall (which I’ve ignored in this article’s examples for brievty) is data corruption. Say your worker holds an array which your jobs access. If two jobs modify it at the same time, even in a way that seems safe, your array will get corrupted. This is not a limitation of pthreads in itself, it’s an issue with threading in general: if two threads modify the same value at the same time, shit happens. To go around this, pthreads provides a Stackable class which you can extend to create thread safe data synchronization, you use it as such:

<?php

class Results extends Stackable
{
}

class Job extends Thread
{
    public function __construct($test)
    {
        $this->test = $test;
        $this->start();
    }

    public function run()
    {
        $this->test[] = rand(0, 10);
    }
}

$results = new Results();

$jobs = [];
while (@$i++ < 100) {
    $jobs[] = new Job($results);
}

foreach ($jobs as $job) {
    $job->join();
}

// Should print 100
var_dump(count($results));

If you run this you’ll get 100 as count of the array, as expected. But now try with a standard array like so $results = [], you will get 0, every time. Because any structure not extending Threaded is serialized and doesn’t reference the original variable anymore, so while its value will be correct in the child threads, it won’t be in the main threads.

As much as possible, if you have to move around data between threads, try to always pass children of Threaded (like Stackable), they won’t be serialized and were made specially to be transported across threads with no data corruption or memory issues.

What are you waiting for?

So here we go, that was an in-depth introduction to threading in PHP through pthreads. I hope next time you have a bunch of things to do at the same time, you’ll think twice before discarding PHP as an option completely. Threading won’t always be the solution, it’s for specific use cases that can be split into separate units of works (hence why the command bus fits into it so nicely), but it is nonetheless a solid tool to have in your toolbox.

The pthreads extension is crazy good, it’s well maintained, it’s already compatible with PHP7 and a new version just came out – version used in this article’s examples was 2.0.10, latest is 3.0.8 for PHP7).

Even better, the extension’s repository is available on Github and it is rife with examples, documentation and explanations about the inner mechanics of it all. The author (krakjoe) also has a very good series of gists to explain the reasoning behind pthreads and threading in PHP in general, which I highly recommend.

All the examples in this article are available in a repository if you need them again.

So go away my friends, and thread carefully!

Comments