lunes, 30 de abril de 2012

Implementación con pthread

Después de comentarlo con algún que otro entendido del irc, me comentaron que lo mejor era encapsular todo lo referente a mutex y condiciones dentro de la propia estructura de datos de la lista:


#ifndef SYNCLIST_H
#define SYNCLIST_H

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

struct synclist
{
  int max_size;
  int max_string_size;
  char **element;
  pthread_mutex_t mutex;
  pthread_cond_t read_done,write_done;
  int size;
};

void synclist_init(struct synclist *o,
           int max_size,int max_string_size)
{
  int i;
  pthread_mutex_init(&o->mutex,NULL);
  pthread_cond_init(&o->read_done,NULL);
  pthread_cond_init(&o->write_done,NULL);
  o->max_size = max_size;
  o->max_string_size = max_string_size;
  o->element = (char **)malloc(o->max_size*sizeof(char *));
  for(i=0;i<o->max_size;i++) {
    o->element[i] = (char *)malloc(o->max_string_size*sizeof(char));
  }
  o->size = 0;
}

void synclist_destroy(struct synclist *o)
{
  int i;
  pthread_mutex_lock(&o->mutex);
  pthread_mutex_destroy(&o->mutex);
  pthread_cond_destroy(&o->read_done);
  pthread_cond_destroy(&o->write_done);
  for(i=0;i<o->max_size;i++) {
    free(o->element[i]);
  }
  free(o->element);
}

void synclist_push(struct synclist *o,char *element)
{
  pthread_mutex_lock(&o->mutex);
  while ( o->size >= (o->max_size) ) {
    /* List is full, waiting for read_done */
    pthread_cond_wait(&o->read_done,&o->mutex);
  }
  /* Write and unlock */
  strncpy(o->element[o->size],element,o->max_string_size);
  o->size++;
  pthread_cond_signal(&o->write_done);
  pthread_mutex_unlock(&o->mutex);
}

void synclist_pop(struct synclist *o,char *element)
{
  pthread_mutex_lock(&o->mutex);
  while ( o->size <= 0 ) {
    /* Empty list, waiting for write_done */
    pthread_cond_wait(&o->write_done,&o->mutex);
  }
  /* Read and unlock */
  strncpy(element,o->element[o->size-1],o->max_string_size);
  o->size--;
  pthread_cond_signal(&o->read_done);
  pthread_mutex_unlock(&o->mutex);
}

#endif

A su vez, meterlo todo en la cabecera synclist.h y a continuación emplearla desde por ejemplo:


#include "synclist.h"
#include <stdio.h>

#define LIST_SIZE 10
#define BUFFER_SIZE 1024
#define READERS 5

void *writer(void *mysl)
{
  int counter = 0;
  char msg[BUFFER_SIZE];
  struct synclist *sl;
  sl = (struct synclist *)mysl;
  while ( 1 ) {
    sprintf(msg,"message #%d",counter);
    counter++;
    synclist_push(sl,msg);
    printf("Writing msg %s (list size %d) (thread id %d)\n",
            msg,sl->size,abs((unsigned int)pthread_self()));
    fflush(stdout);
  }
}

void *reader(void *mysl)
{
  char msg[BUFFER_SIZE];
  struct synclist *sl;
  sl = (struct synclist *)mysl;
  while ( 1 ) {
    synclist_pop(sl,msg);
    printf("Reading msg %s (list size %d) (thread id %d)\n",
       msg,sl->size,abs((unsigned int)pthread_self()));
    fflush(stdout);
  }
}

int main(int argc,char *argv[])
{
  struct synclist sl;
  char buffer[BUFFER_SIZE];
  pthread_t t[READERS+1];
  int i;
  synclist_init(&sl,LIST_SIZE,BUFFER_SIZE);
  pthread_create(&t[0],NULL,writer,(void *)&sl);
  for(i=0;i<READERS;i++) {
    pthread_create(&t[i+1],NULL,reader,(void *)&sl);
  }
  while ( 1 ) {
    continue;
  }
  synclist_destroy(&sl);
  return 0;
}


viernes, 27 de abril de 2012

One producer, two consumers and a fail

Pues bien, aquí comentando una prueba cronometrada que me tocó hacer anteayer; resulta que tenía 4 horas para implementar un programita multihilo en C y es una de las cosas que siempre tenía ilusión por aprender; el tema de garantizar que la gestión de recursos compartidos entre hilos vaya como debe.

Pues bien, googleo un poco, llego a aquí y digo: "bien, el último código es casi igual que lo que me piden añadiendo un consumidor"; es decir: tenía que implementar tres hilos, el primero debía escribir cadenas en una lista, y los dos restantes ir leyendo mensajes de dicha lista purgándola a su vez.

Bien, en cosa de una hora me aclaro de qué significa cada parte, los mutex y los eventos e implemento un maravilloso programa que parece hacer lo que se pide que haga a excepción de que no emplea la lista y si le sacas los sleeps (que me pidieron insertar entre lecturas y escrituras) acaba en un rotundo deadlock a los pocos segundos de ejecución. Viendo que no estaba a la altura de lo que se pedía, envío el fail pensando: "bueno ... ya que he invertido una hora en esta cosa por lo menos lo envío qué carajo !"

Bueno, la cuestión es que ayer noche me piqué para ver cuál era el fallo y acabé con esto:


#include "StdAfx.h"
#include <windows.h>
#include <iostream>
#include <stdlib.h>
#include <math.h>

#define MAX_LIST_SIZE 100000
#define MAX_STRING_SIZE 256
#define READERS 10

HANDLE _hMutex, _hWriteDone, _hReadDone;

/* List structure, init, pop and push methods */

struct strlst {
  int size;
  char list[MAX_LIST_SIZE][MAX_STRING_SIZE];
} _strlst;

void pop_list(char *str,struct strlst *sl)
{
  if ( sl->size == 0 ) {
    fprintf(stderr,"List empty\n");
    str[0] = '\0';
    return;
  }
  strcpy(str,sl->list[sl->size-1]);
  sl->size--;
  return;
}

void push_list(char *str,struct strlst *sl)
{
  if ( sl->size >= (MAX_LIST_SIZE-1) ) {
    fprintf(stderr,"List overflow, message lost: %s\n",str);
    return;
  }
  strcpy(sl->list[sl->size],str);
  sl->size++;
  return;
}

void init_list(struct strlst *sl)
{
  sl->size = 0;
}

/* Writer function */

void Writer(void *myname)
{
  char msg[MAX_STRING_SIZE];
  char name[MAX_STRING_SIZE];
  int counter = 0;
  strcpy(name,(char *)myname);
  while ( true ) {
    while (true) {
      if (WaitForSingleObject(_hMutex, INFINITE) == WAIT_FAILED) {
    /* Something went wrong */
    ExitThread(0);
      }
      if ( _strlst.size > MAX_LIST_SIZE-1 ) {
    /* Can't write anything, list is full, waiting to a reader to pop an element */
    ReleaseMutex(_hMutex);
    WaitForSingleObject(_hReadDone, INFINITE);
    continue;
      }
      break;
    }
    /* Accessing the list and pulsing write event */
    sprintf(msg,"%d written by thread %d aka %s (list size: %d)",counter,GetCurrentThreadId(),name,_strlst.size+1);
    counter++;
    push_list(msg,&_strlst);
    ReleaseMutex(_hMutex);
    PulseEvent(_hWriteDone);
  }
}

/* Reader function */

void Reader(void *myname)
{
  char name[MAX_STRING_SIZE];
  strcpy(name,(char *)myname);
  char msg[MAX_STRING_SIZE];
  while(true) {
    if (WaitForSingleObject(_hMutex, INFINITE) == WAIT_FAILED) {
      /* Something went wrong */
      ExitThread(0);
    }
    if (_strlst.size == 0) {
      /* Nothing to read on the list waiting for the writer */
      ReleaseMutex(_hMutex);
      WaitForSingleObject(_hWriteDone, INFINITE);
      continue;
    }
    /* Reading the list and pulsing read event */
    pop_list(msg,&_strlst);
    printf("%s and read by thread %d aka %s\n",msg,GetCurrentThreadId(),name);
    ReleaseMutex(_hMutex);
    Sleep(10*rand()/RAND_MAX);
    PulseEvent(_hReadDone);
  }
}

void main() 
{
  HANDLE TName[1+READERS];
  DWORD ThreadID;
  char names[1+READERS][MAX_STRING_SIZE];
  /* init list */
  init_list(&_strlst); 
  _hMutex= CreateMutex(NULL, FALSE, NULL);
  _hWriteDone= CreateEvent(NULL, TRUE, FALSE, NULL);
  _hReadDone= CreateEvent(NULL, TRUE, FALSE, NULL);

  sprintf(names[0],"W1");
  /* creating writer thread */
  TName[0]= CreateThread(NULL, 0,
             (LPTHREAD_START_ROUTINE)Writer,
             (void *)names[0], 0, &ThreadID);
  for(int i=0;i<READERS;i++) {
    /* creating reader i+1 */
    sprintf(names[i+1],"R%d",i+1);
    TName[i+1]= CreateThread(NULL, 0,
                 (LPTHREAD_START_ROUTINE)Reader,
                 (void *)names[i+1], 0, &ThreadID);
  }
  WaitForMultipleObjects(READERS+1, TName, TRUE, INFINITE);
  CloseHandle(TName);
}

La primera parte, es la definición de la lista de cadenas que contendrá los mensajes a compartir entre el escritor y los lectores; en concreto un productor (Writer) y tantos lectores como READERS (en el caso 10). Para la sincronización empleo un mutex _hMutex y dos eventos _hWriteDone y _hReadDone.

El productor Writer inicializa el contador de mensajes counter a 0 y se mete en un bucle infinito donde inicialmente espera luz verde para acceder a la lista: WaitForSingleObject(_hMutex,INFINITE), a continuación chequea si la lista está llena en cuyo caso libera _hMutex y espera que se produzca alguna lectura WaitForSingleObject(_hReadDone,INFINITE) y en caso contrario añade un mensaje a la lista, libera _hMutex y señala el evento de que se ha producido una nueva inserción en la lista: PulseEvent(_hWriteDone).

Los consumidores Reader empiezan esperando acceso a la lista esperando _hMutex libre; una vez tienen acceso comprueban si la lista está vacía en cuyo caso esperan a que el escritor escriba algo en ella tras liberar _hMutex de nuevo: WaitForSingleObject(_hWriteDone,INFINITE).
En caso de que la lista contenga algún elemento, proceden a extraerlo y mostrarlo por pantalla; liberan _hMutex  y señalan que se ha producido una lectura de modo que Writer sepa que la lista se ha purgado.

Creo que lo he conseguido hacer de forma correcta, lo he probado con diferentes tamaños de lista, retardos en las lecturas (para forzar la situación en que la lista se llena y algunos mensajes no llegan a ser escritos: list overflow, message lost y tal ...) y distintos números de lectores, etc ... y parece que chuta correctamente durante horas :)

100193 written by thread 372 aka W1 (list size: 99992) and read by thread 4852 aka R5
100283 written by thread 372 aka W1 (list size: 99992) and read by thread 4852 aka R5
List overflow, message lost: 100292 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100293 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100294 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100295 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100296 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100297 written by thread 372 aka W1 (list size: 100000)
100291 written by thread 372 aka W1 (list size: 99999) and read by thread 4852 aka R5
100290 written by thread 372 aka W1 (list size: 99998) and read by thread 5652 aka R9
100289 written by thread 372 aka W1 (list size: 99997) and read by thread 1388 aka R10
100288 written by thread 372 aka W1 (list size: 99996) and read by thread 3400 aka R8
100287 written by thread 372 aka W1 (list size: 99995) and read by thread 5064 aka R2
List overflow, message lost: 100303 written by thread 372 aka W1 (list size: 100000)
List overflow, message lost: 100304 written by thread 372 aka W1 (list size: 100000)


En cualquier caso, el ejercicio me ha servido de motivación para meterle mano al tema de comunicación entre hilos que siempre hasta ahora los había soltado en paralelo de forma independiente y/o utilizando frameworks que encapsulaban la complejidad de acceso a recursos compartidos entre ellos; ahora lo siguiente es implementarlo usando pthreads en Linux y ya si eso tratar de añadirle más productores a ver cómo. Si alguien que entiende ve algún fallo o error de concepto y/o forma alternativa de hacer lo mismo más clara y eficiente y además se digna a dejarlo en comentarios le estaré eternamente agradecido.

Este video es una Informacion Real,solamente pretendo Informar de la Noticia
Sucribete y dar a Like. Muchisimas Gracias.