viernes, 27 de abril de 2012

One producer, two consumers and a fail

Pues bien, aquí comentando una prueba cronometrada que me tocó hacer anteayer; resulta que tenía 4 horas para implementar un programita multihilo en C y es una de las cosas que siempre tenía ilusión por aprender; el tema de garantizar que la gestión de recursos compartidos entre hilos vaya como debe.

Pues bien, googleo un poco, llego a aquí y digo: "bien, el último código es casi igual que lo que me piden añadiendo un consumidor"; es decir: tenía que implementar tres hilos, el primero debía escribir cadenas en una lista, y los dos restantes ir leyendo mensajes de dicha lista purgándola a su vez.

Bien, en cosa de una hora me aclaro de qué significa cada parte, los mutex y los eventos e implemento un maravilloso programa que parece hacer lo que se pide que haga a excepción de que no emplea la lista y si le sacas los sleeps (que me pidieron insertar entre lecturas y escrituras) acaba en un rotundo deadlock a los pocos segundos de ejecución. Viendo que no estaba a la altura de lo que se pedía, envío el fail pensando: "bueno ... ya que he invertido una hora en esta cosa por lo menos lo envío qué carajo !"

Bueno, la cuestión es que ayer noche me piqué para ver cuál era el fallo y acabé con esto:


#include "StdAfx.h"
#include <windows.h>
#include <iostream>
#include <stdlib.h>
#include <math.h>

#define MAX_LIST_SIZE 100000
#define MAX_STRING_SIZE 256
#define READERS 10

HANDLE _hMutex, _hWriteDone, _hReadDone;

/* List structure, init, pop and push methods */

struct strlst {
  int size;
  char list[MAX_LIST_SIZE][MAX_STRING_SIZE];
} _strlst;

void pop_list(char *str,struct strlst *sl)
{
  if ( sl->size == 0 ) {
    fprintf(stderr,"List empty\n");
    str[0] = '\0';
    return;
  }
  strcpy(str,sl->list[sl->size-1]);
  sl->size--;
  return;
}

void push_list(char *str,struct strlst *sl)
{
  if ( sl->size >= (MAX_LIST_SIZE-1) ) {
    fprintf(stderr,"List overflow, message lost: %s\n",str);
    return;
  }
  strcpy(sl->list[sl->size],str);
  sl->size++;
  return;
}

void init_list(struct strlst *sl)
{
  sl->size = 0;
}

/* Writer function */

void Writer(void *myname)
{
  char msg[MAX_STRING_SIZE];
  char name[MAX_STRING_SIZE];
  int counter = 0;
  strcpy(name,(char *)myname);
  while ( true ) {
    while (true) {
      if (WaitForSingleObject(_hMutex, INFINITE) == WAIT_FAILED) {
    /* Something went wrong */
    ExitThread(0);
      }
      if ( _strlst.size > MAX_LIST_SIZE-1 ) {
    /* Can't write anything, list is full, waiting to a reader to pop an element */
    ReleaseMutex(_hMutex);
    WaitForSingleObject(_hReadDone, INFINITE);
    continue;
      }
      break;
    }
    /* Accessing the list and pulsing write event */
    sprintf(msg,"%d written by thread %d aka %s (list size: %d)",counter,GetCurrentThreadId(),name,_strlst.size+1);
    counter++;
    push_list(msg,&_strlst);
    ReleaseMutex(_hMutex);
    PulseEvent(_hWriteDone);
  }
}

/* Reader function */

void Reader(void *myname)
{
  char name[MAX_STRING_SIZE];
  strcpy(name,(char *)myname);
  char msg[MAX_STRING_SIZE];
  while(true) {
    if (WaitForSingleObject(_hMutex, INFINITE) == WAIT_FAILED) {
      /* Something went wrong */
      ExitThread(0);
    }
    if (_strlst.size == 0) {
      /* Nothing to read on the list waiting for the writer */
      ReleaseMutex(_hMutex);
      WaitForSingleObject(_hWriteDone, INFINITE);
      continue;
    }
    /* Reading the list and pulsing read event */
    pop_list(msg,&_strlst);
    printf("%s and read by thread %d aka %s\n",msg,GetCurrentThreadId(),name);
    ReleaseMutex(_hMutex);
    Sleep(10*rand()/RAND_MAX);
    PulseEvent(_hReadDone);
  }
}

void main() 
{
  HANDLE TName[1+READERS];
  DWORD ThreadID;
  char names[1+READERS][MAX_STRING_SIZE];
  /* init list */
  init_list(&_strlst); 
  _hMutex= CreateMutex(NULL, FALSE, NULL);
  _hWriteDone= CreateEvent(NULL, TRUE, FALSE, NULL);
  _hReadDone= CreateEvent(NULL, TRUE, FALSE, NULL);

  sprintf(names[0],"W1");
  /* creating writer thread */
  TName[0]= CreateThread(NULL, 0,
             (LPTHREAD_START_ROUTINE)Writer,
             (void *)names[0], 0, &ThreadID);
  for(int i=0;i<READERS;i++) {
    /* creating reader i+1 */
    sprintf(names[i+1],"R%d",i+1);
    TName[i+1]= CreateThread(NULL, 0,
                 (LPTHREAD_START_ROUTINE)Reader,
                 (void *)names[i+1], 0, &ThreadID);
  }
  WaitForMultipleObjects(READERS+1, TName, TRUE, INFINITE);
  CloseHandle(TName);
}

La primera parte, es la definición de la lista de cadenas que contendrá los mensajes a compartir entre el escritor y los lectores; en concreto un productor (Writer) y tantos lectores como READERS (en el caso 10). Para la sincronización empleo un mutex _hMutex y dos eventos _hWriteDone y _hReadDone.

El productor Writer inicializa el contador de mensajes counter a 0 y se mete en un bucle infinito donde inicialmente espera luz verde para acceder a la lista: WaitForSingleObject(_hMutex,INFINITE), a continuación chequea si la lista está llena en cuyo caso libera _hMutex y espera que se produzca alguna lectura WaitForSingleObject(_hReadDone,INFINITE) y en caso contrario añade un mensaje a la lista, libera _hMutex y señala el evento de que se ha producido una nueva inserción en la lista: PulseEvent(_hWriteDone).

Los consumidores Reader empiezan esperando acceso a la lista esperando _hMutex libre; una vez tienen acceso comprueban si la lista está vacía en cuyo caso esperan a que el escritor escriba algo en ella tras liberar _hMutex de nuevo: WaitForSingleObject(_hWriteDone,INFINITE).
En caso de que la lista contenga algún elemento, proceden a extraerlo y mostrarlo por pantalla; liberan _hMutex  y señalan que se ha producido una lectura de modo que Writer sepa que la lista se ha purgado.

Creo que lo he conseguido hacer de forma correcta, lo he probado con diferentes tamaños de lista, retardos en las lecturas (para forzar la situación en que la lista se llena y algunos mensajes no llegan a ser escritos: list overflow, message lost y tal ...) y distintos números de lectores, etc ... y parece que chuta correctamente durante horas :)

100193 written by thread 372 aka W1 (list size: 99992) and read by thread 4852 aka R5
100283 written by thread 372 aka W1 (list size: 99992) and read by thread 4852 aka R5
List overflow, message lost: 100292 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100293 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100294 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100295 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100296 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100297 written by thread 372 aka W1 (list size: 100000)
100291 written by thread 372 aka W1 (list size: 99999) and read by thread 4852 aka R5
100290 written by thread 372 aka W1 (list size: 99998) and read by thread 5652 aka R9
100289 written by thread 372 aka W1 (list size: 99997) and read by thread 1388 aka R10
100288 written by thread 372 aka W1 (list size: 99996) and read by thread 3400 aka R8
100287 written by thread 372 aka W1 (list size: 99995) and read by thread 5064 aka R2
List overflow, message lost: 100303 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100304 written by thread 372 aka W1 (list size: 100000)


En cualquier caso, el ejercicio me ha servido de motivación para meterle mano al tema de comunicación entre hilos que siempre hasta ahora los había soltado en paralelo de forma independiente y/o utilizando frameworks que encapsulaban la complejidad de acceso a recursos compartidos entre ellos; ahora lo siguiente es implementarlo usando pthreads en Linux y ya si eso tratar de añadirle más productores a ver cómo. Si alguien que entiende ve algún fallo o error de concepto y/o forma alternativa de hacer lo mismo más clara y eficiente y además se digna a dejarlo en comentarios le estaré eternamente agradecido.

Este video es una Informacion Real,solamente pretendo Informar de la Noticia
Sucribete y dar a Like. Muchisimas Gracias.



No hay comentarios:

Publicar un comentario