WEB-d Développement Web

PHP, SQL, HTML5, CSS3, Javascript, Mootools, Référencement, SEO, CMS, e-commerce, Apache, Linux, Ubuntu, ...

PHP thread, stackable et worker

Une fois l'extension pthread installée, reste à utiliser les threads efficacement.

Stackable

Comme on l'a montré dans le post précédent sur les threads PHP, les espaces de mémoire des différents threads sont protégés. Si un objet doit être modifié par différents threads, la classe de cet objet doit étendre la classe Stackable. Celle-ci comporte une méthode abstraite, run(), qui peut rester vide:


class Counter extends Stackable
{
    public $value = 0;
   
    public function inc ( ) {
        $this->value++;
        echo $this->value . "\n";
    }
   
    public function run() {}
}

class My_Thread extends Thread
{
    public $counter ;

    public function __construct ( Counter $c ) {
        $this->counter = $c;
    }

    public function run ( ) {
        echo "Thread running...\n";
        for ( $i = 0; $i < 10 ;$i++ ) {
            $this->counter->inc();
        }
    }
}

$c = new Counter();

$t1 = new My_Thread($c);
$t1->start();
$t1->join();
 

Cette fois-ci, le code produit le résultat escompté: 1 2 3 ...

Tableaux (Array)

La classe Stackable peut également être accédée comme un tableau (array). On peut donc l'utiliser pour créer un tableau qui pourra être modifié par différents threads:


<?php

class MyArray extends Stackable {
    public function run() {}
}

class MyThread extends Thread {
  public $shared_array;
 
  public function __construct($shared_array){
    $this->shared_array = $shared_array;
    $this->start();
  }

  public function run(){
    $this->shared_array[] = rand(0, 10);
  }
}

$shared_array = new MyArray();

// Lancer 10 threads qui vont modifier le même "tableau"
for ($i=0; $i<10; $i++) {
  $thread = new MyThread($shared_array);
}

// Attendre que tous les threads aient fini
// (on devrait idéalement utiliser wait)
sleep(1);

var_dump($shared_array);
 

Worker

Un objet de la classe (ou héritant de la classe) Worker peut-être considéré comme un thread persistant. On peut lui faire exécuter plusieurs "tâches", sous la forme d'objets de la classe Stackable. Si de nombreuses tâches doivent être exécutées, on économise ainsi le coût de création des threads:


class MyTask extends Stackable {
  public $result = 0;

  public function run() {
    // Une opération longue
    for ($i=0; $i<100000; $i++) {
      $this->result += rand() * $i / 100000;
    }
  }
}

class MyWorker extends Worker {
  public function run() {
    // Rien de spécial...
  }
}

$worker = new MyWorker();
$worker->start();

$tasks = array();

for ($i=0; $i<10; $i++) {
  $task = new MyTask();
  $worker->stack($task);
  $tasks[] = $task;
}

// On attend que toutes les tâches aient fini...
sleep(5);
foreach ($tasks as $task) {
  echo $task->result . "\n";
}
 

Attention: contrairement à la classe Thread, la classe Stackable ne possède pas de méthode join()...

Chaque objet Stackable peut également accéder au Worker, afin d'y stocker des informations par exemple. Dans le code ci-dessous, on tente d'exécuter 1000 tâches. L'instruction usleep(1000) fait que chaque tâche durera au moins 1ms. Après 1 seconde, toutes les tâches ne sont donc pas exécutées. Le nombre réel de tâches exécutées oscillera généralement entre 500 et 800:


class MyTask extends Stackable {
  public $result = 0;

  public function run() {
    usleep(1000);
    $this->worker->incTaskCount();
  }
}

class MyWorker extends Worker {
  protected $task_count = 0;
 
  protected function incTaskCount() {
    $this->task_count++;
  }

  public function getTaskCount() {
    return $this->task_count;
  }
 
  public function run() {
    // Rien de spécial...
  }
}

$worker = new MyWorker();
$worker->start();

$tasks = array();

for ($i=0; $i<1000; $i++) {
  $task = new MyTask();
  $worker->stack($task);
  $tasks[] = $task;
}

sleep(1);

echo $worker->getTaskCount() . " tâches exécutées...\n";
 

Distribution des tâches : ordonnancement "round-robin"

Les threads sont généralement utilisés pour 2 raisons:

  1. Soit exécuter plusieurs tâches intensives en parallèle sur plusieurs CPU ou coeurs (distributed computing);
  2. Soit exécuter des tâches légères, mais lentes (par exemple accéder au réseau), afin d'éviter que le thread principal ne soit bloqué.

Dans les 2 cas on peut utiliser la classe Worker. Dans le premier cas on crée autant de Workers qu'il n'y a de coeurs présents sur la machine, multiplié par 2 si le processeur supporte l'hypterthreading. Dans le second cas on choisit un nombre élevé de Workers (100 par exemple).

L'extension pthreads n'offre pas actuellement de mécanisme d'ordonnancement. La méthode la plus simple sera donc de créer une méthode du type "round-robin", qui distribue les tâches à exécuter équitablement entre les différents workers:


class MyTask extends Stackable {
  public $result = 0;

  public function run() {
    // Une opération longue
    for ($i=0; $i<100000; $i++) {
      $this->result += rand() * $i / 100000;
    }
  }
}

class MyWorker extends Worker {
  public function run() {
    // Rien de spécial...
  }
}

$WORKER_COUNT = 4;
$WORKER_INDEX = 0;

$workers = array();
for ($i=0; $i<$WORKER_COUNT; $i++) {
  $worker = new MyWorker();
  $worker->start();
  $workers[] = $worker;
}

// Distribuer beaucoup de tâches
$tasks = array();
for ($i=0; $i<200; $i++) {
  $WORKER_INDEX = ($WORKER_INDEX + 1) % $WORKER_COUNT;
  $task = new MyTask();
  $workers[$WORKER_INDEX]->stack($task);
  $tasks[] = $task;
}
 

Mon PC étant équipé d'un CPU dual-core avec hyperthreading, mon CPU est complètement occupé (400%) avec 4 workers:

PHP workers

Attention: n'oubliez pas de toujours garder une référence vers vos objets Stackable jusqu'à la fin de leur exécution (comme ci-dessus, avec le tableau $tasks), sinon vous risquez d'obtenir des erreurs de segmentation (sementation fault) ou des erreurs fatales "Allowed memory size exhausted" de façon assez aléatoire!

Articles similaires