Publi

Jugando con threads y mutex en C++11

Hace poco tuve un problema en el que la concurrencia era fundamental para realizar la tarea en el mínimo tiempo posible, y decidí darle una oportunidad a C++11 y a algunas de sus nuevas características (todo esto ya lo podíamos hacer con las bibliotecas Boost por ejemplo, pero da alegría poder hacer muchas cosas desde std 🙂

Aprovechando mi experiencia he decidido poner un pequeño ejemplo de cómo realizar una aplicación concurrente en esta revisión de C++, cosa que veo la tarea más fácil del mundo y, por supuesto nos beneficiará a todos a la hora de buscar el paralelismo.

Para los que no tengan mucha idea de lo que hablo, voy a dar una breve introducción. Hace unos años, todos teníamos un ordenador con un sólo procesador, y un sólo núcleo, y eran pocos los que tenían una placa base que aceptaba dos procesadores; poco a poco, actualmente ahora casi todo el mundo tiene por lo menos un ordenador con dos núcleos. En principio, para un uso normal, el sistema operativo va repartiendo las tareas entre esos dos núcleos, y el rendimiento general es mayor. Pero, ¿y si vamos a realizar una tarea realmente pesada y queremos tener a nuestra disposición todos los núcleos disponibles, y ejecutar parte de la tarea en cada uno? Para eso tenemos la programación concurrente, y en este caso los threads o hilos de ejecución, donde programaremos qué hace cada hilo. En realidad para utilizar threads no tenemos por qué tener varios cores (era sólo un ejemplo), en realidad, durante la ejecución de un programa se realizan numerosas tareas, en las cuales, por ejemplo, hay esperas (por entrada/salida de un dispositivo por ejemplo), podríamos aprovechar el tiempo en el que un proceso pasa esperando a un dispositivo, para ejecutar código de otro proceso, y estaremos ganando tiempo.

La concurrencia, así visto, parece que todo son ventajas, lo único malo, es que tenemos que programar qué tareas serán las concurrentes, si tenemos una tarea muy pesada, pensar en la mejor forma de dividirla, y optimizar los hilos de ejecución que vamos a lanzar, en principio, una forma de hacernos una idea es mirar los cores disponibles en el sistema.

Por otro lado, a veces tenemos la necesidad de que dos hilos de ejecución (o varios) hablen entre sí, es decir, intercambien información o actualicen una misma variable; por ejemplo estamos haciendo una suma de cientos de miles de números, y hemos decidido dividirla en 4, así que cada hilo irá sumando a una variable números constantemente, pero claro, son 4 procesos independientes accediendo a un mismo espacio de memoria, por lo que es posible que varios procesos coincidan a la hora de actualizar el total y, ¿qué pasará? que sólo el último proceso que haya escrito, será el que haya dejado su valor en la variable, evitando que las actualizaciones anteriores tengan efecto. Esto es la condición de carrera.

Para hablar de la condición de carrera, me gusta imaginarme un baño público en el que sólo cabe uno, por ejemplo, la variable (sólo se puede realizar una operación en ella a la vez). Si cuatro personas entran en el baño al mismo tiempo, y es tan pequeño como los de la foto… ¡ no sabemos qué puede salir de ahí ! y seguro que las cuatro personas acaban molestas. En este momento, introducimos el mutex (mutual exclusion o exclusión mutua), es decir, el seguro en la puerta del baño. Este seguro hace que cuando uno entre, no pueda entrar nadie más, por lo que cuando el proceso 1 entra, hace todas las operaciones necesarias, y sale (dejando el seguro abierto), el proceso 2 entrará y mientras realiza sus operaciones, tal vez vengan el proceso 3 y el proceso 4 a realizar operaciones, pero se quedarán a la cola, hasta que 2 termine.

Ahora, vamos a ver un poco de código. Primero tenemos que ver que en C++11 para crear un hilo de ejecución o thread es necesario incluir la biblioteca (y en Linux compilar con -lpthread). C++11 también soporta funciones lambda o anónimas, por lo que todo será mucho más fácil:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <chrono>
#include <string>
#include <thread>

// Variable compartida
static int num=0;

// Función principal
void suma(int n)
{    
// Introducimos una espera, porque el proceso a realizar aquí tarda un tiempo, y no siempre es el mismo.
// Sólo simulamos un proceso más complicado...
    std::this_thread::sleep_for(std::chrono::milliseconds(n));
    num+=n;
}


int main()
{
  // Lanzamos muchos hilos de ejecución, unos suman, otrs restan...
  std::thread([](){ for (int i = 0; i < 30; ++i) suma(-i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 20; ++i) suma(-i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 30; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 20; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();

  // Introducimos una espera para que todos los threads hayan terminado
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  std::cout<<std::endl<<num<<std::endl;
}

Para compilar lo debemos hacer de la siguiente forma:

g++ -o threads threads.cpp -std=c++11 -pthreads

Tal vez, en el futuro no tengamos que decir -std=c++11, pero por ahora, mientras el soporte de GCC sea experimental debemos hacerlo.

Hemos lanzado 8 procesos, cada uno hace una llamada a suma() un número determinado de veces, en definitiva, queremos hacer la suma total, que nos dará 480. Justo cuando empieza suma() he incluido una espera variable:

1
std::this_thread::sleep_for(std::chrono::milliseconds(n));

Y al final del programa, otra espera de 1000ms (si tenemos un ordenador lento, pondremos 2000 o 3000, pero no creo que se demore tanto). Otra opción en lugar de incluir la espera, sería introducir los threads en un array y esperar a su terminación… ¡ pero esto es sólo un experimento !

Cuando termina la ejecución (después de ejecutarlo varias veces), veremos unos resultados curiosos:

$ ./threads
462
$ ./threads
474
$ ./threads
475
$ ./threads
421
$ ./threads
460

Vamos, que no da una, y es que al tener varios procesos que quieren escribir a la vez en la variable, perdemos estados en la suma que no son leídos y por tanto el resultado no es correcto. Por tanto, los threads, vistos así están muy bien para realizar tareas que no tienen nada que ver entre sí, las cuales las vamos a hacer al mismo tiempo.

Ahora introducimos código para realizar la exclusión mutua, lo que vamos a hacer es bloquear la actualización de la variable con el fin de que sólo uno de los hilos pueda estar actualizando el valor y si alguien más quiere actualizar, que se espere.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <chrono>
#include <mutex>
#include <string>
#include <thread>

// Variable compartida
static int num=0;

// Función principal
void suma(int n)
{    
    static std::mutex m;
    std::this_thread::sleep_for(std::chrono::milliseconds(n));
    m.lock();
    num+=n;
    m.unlock();
}


int main()
{
  // Lanzamos muchos hilos de ejecución, unos suman, otrs restan...
  std::thread([](){ for (int i = 0; i < 30; ++i) suma(-i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 20; ++i) suma(-i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 30; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 20; ++i) suma(i); }).detach();
  std::thread([](){ for (int i = 0; i < 16; ++i) suma(i); }).detach();

  // Introducimos una espera para que todos los threads hayan terminado
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  std::cout<<std::endl<<num<<std::endl;
}

El código es muy parecido, sólo introducimos unas líneas referentes al mutex (para declararlo, para bloquearlo y desbloquearlo, en este caso sólo bloqueamos la suma, que es la operación conflictiva.
En este caso al ejecutarlo:

$ ./threads
480
$ ./threads
480
$ ./threads
480
$ ./threads
480
$ ./threads
460

Para concluir, una cosa más, es una forma más sencilla de bloquear, y que nos permite olvidarnos de desbloquear (desbloqueará cuando termine la función):

1
2
3
4
5
6
7
void suma(int n)
{    
    static std::mutex m;
    std::this_thread::sleep_for(std::chrono::milliseconds(n));
    std::lock_guard<std::mutex> lock(m);
    num+=n;
}

Foto: Gonmi (Flickr) CC-A a 04/03/2013

También podría interesarte...

There are 2 comments left Ir a comentario

  1. Emilio /
    Usando Google Chrome Google Chrome 49.0.2623.112 en Windows Windows XP

    Esto tendrá unos años pero la verdad que está muy bien explicado.
    Gracias!

    1. Gaspar Fernández / Post Author
      Usando Mozilla Firefox Mozilla Firefox 52.0 en Ubuntu Linux Ubuntu Linux

      ¡Gracias Emilio! Sí que tiene un tiempo ya. Cualquier día actualizo el post metiendo algunas cosas nuevas que he ido aprendiendo .

Leave a Reply