tpl_agent.H

00001 
00002 /*
00003   This file is part of Aleph system
00004 
00005   Copyright (c) 2002, 2003, 2004, 2005, 2006
00006   UNIVERSITY LOS ANDES (ULA) Merida - REPÚBLICA BOLIVARIANA DE VENEZUELA  
00007 
00008   - Center of Studies in Microelectronics & Distributed Systems (CEMISID) 
00009   - ULA Computer Science Department
00010   - FUNDACITE Mérida - Ministerio de Ciencia y Tecnología
00011 
00012   PERMISSION TO USE, COPY, MODIFY AND DISTRIBUTE THIS SOFTWARE AND ITS
00013   DOCUMENTATION IS HEREBY GRANTED, PROVIDED THAT BOTH THE COPYRIGHT
00014   NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES OF THE
00015   SOFTWARE, DERIVATIVE WORKS OR MODIFIED VERSIONS, AND ANY PORTIONS
00016   THEREOF, AND THAT BOTH NOTICES APPEAR IN SUPPORTING DOCUMENTATION.
00017 
00018   Aleph is distributed in the hope that it will be useful, but WITHOUT
00019   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
00020   or FITNESS FOR A PARTICULAR PURPOSE.
00021 
00022   UNIVERSIDAD DE LOS ANDES requests users of this software to return to 
00023 
00024   Leandro Leon
00025   CEMISID 
00026   Ed La Hechicera 
00027   3er piso, ala sur
00028   Facultad de Ingenieria 
00029   Universidad de Los Andes 
00030   Merida - REPÚBLICA BOLIVARIANA DE VENEZUELA    or
00031 
00032   lrleon@ula.ve
00033 
00034   any improvements or extensions that they make and grant Universidad 
00035   de Los Andes (ULA) the rights to redistribute these changes. 
00036 
00037   Aleph is (or was) granted by: 
00038   - Consejo de Desarrollo Cientifico, Humanistico, Tecnico de la ULA 
00039     (CDCHT)
00040   - Fundacite Mérida
00041 */
00042 
00043 /*****************************************************************
00044                   Grafos de agentes
00045 *****************************************************************/
00046 
00047 /* 
00048    Contribuyentes:
00049 
00050    Aristides Castillo
00051    Derik Romero
00052    Leandro León
00053 */
00054 
00055 # ifndef TPL_AGENT_GRAPH_H
00056 # define TPL_AGENT_GRAPH_H
00057 
00058 # include <iostream>
00059 # include <exception>
00060 # include <sys/types.h>
00061 # include <dlink.H>
00062 # include <tpl_concurrent_graph.H>
00063 # include <tpl_dynArray.H>
00064 # include <tpl_arrayQueue.H>
00065 
00066 using namespace Aleph;
00067 using namespace std;
00068 
00069 namespace Aleph 
00070 {
00071 
00072   /*****************************************************************
00073       Declaraciones adelantadas de clases 
00074   *****************************************************************/
00075 template <typename Agent_Info> class Agent;     // Agente que circula por grafo
00076   
00077 template <typename Node_Info> class Agent_Node; // Nodo de grafo
00078   
00079 
00084     template <typename Agents_Node_Info> 
00085 class Agent_Node : public Concurrent_Node <Agents_Node_Info>
00086 {
00087   
00088 public:
00089 
00090   Dlink agent_list; // lista de agentes que tiene el nodo
00091 
00092   typedef Concurrent_Node<Agents_Node_Info> Base_Node;
00093 
00094   typedef Agents_Node_Info Node_Type;
00095 
00096   Agent_Node() : Base_Node() { /* empty */ }
00097 
00098   Agent_Node(const Node_Type & info) : Base_Node(info) { /* empty */ }
00099 
00100   virtual ~Agent_Node(){ /* empty */ }
00101 }; 
00102   
00103   
00108     template <typename Agents_Arc_Info> 
00109 class Agent_Arc : public Concurrent_Arc<Agents_Arc_Info>    
00110 {
00111 
00112 public:
00113 
00114   Dlink agent_list; // lista de agentes que tiene el arco
00115 
00116   typedef Concurrent_Arc<Agents_Arc_Info> Base_Arc;
00117 
00118   typedef Agents_Arc_Info Arc_Type;
00119 
00120   Agent_Arc() : Base_Arc() { /* empty */ } 
00121 
00122   virtual ~Agent_Arc() { /* empty */ } 
00123 };
00124   
00125 
00188 template <typename __Node,  // Definción de nodo basada Agent_Node<T>
00189           typename __Arc,   // Definición de arco basada en Agent_Arc<T>
00190        typename Agent_Info
00191           > 
00192 class Agent_Graph : public Concurrent_Graph<__Node, __Arc>
00193 {
00194 public:
00195 
00197   typedef __Node Node;
00198 
00200   typedef __Arc Arc;
00201 
00203   typedef Agent_Graph<__Node, __Arc, Agent_Info> graph_type;
00204 
00205 private:
00206 
00207       // Estados de ejecución del grafo
00208   enum Status { 
00209     Init,        // Estado inicial colocado por el constructor de Agent
00210     Running,     // Los agentes se están ejecutando
00211     Suspended,   // La ejecución está suspendida pero puede ser reanudada
00212     Stopped      // La ejecución ha sido detenida (las threads han sido
00213                  // destruidas)
00214   }; 
00215 
00216   const size_t     num_threads;
00217   pthread_t *      threads;     // Arreglo de threads de dimensión num_threads
00218   Status           status;      // Estado de ejecución del grafo
00219 
00220       /* protege las colas ready y suspended y en general todo el grafo */
00221   pthread_mutex_t  graph_mutex;
00222 
00223   /* 
00224      Esta variable de condición se utiliza para bloquear las threads en
00225      dos situaciones:
00226 
00227      1-. Tenemos num_threads para procesar una cantidad arbitraria de
00228          agentes. Sin embargo, en algunas circunstancias puede haber
00229          menos agentes que num_threads. En este caso, debemos suspender
00230          num_threads - num_agents threads que no se pueden procesar
00231          puesto que no tienen un agente para correr. Esta variable de
00232          condición es la que bloquea una thread que no tiene agente para
00233          ejecutar.
00234 
00235      2-. Cuando el grafo se manda a suspender
00236   */
00237   pthread_cond_t graph_cond_var;
00238 
00239 public:
00240 
00289   class Agent
00290   {
00291 
00292     friend class Agent_Graph;
00293 
00294     enum Agent_State
00295       {
00296      Ready,
00297      Executing,
00298      Suspending,
00299      Suspended,
00300      Deleting_From_Ready,
00301      Deleting_From_Suspended
00302       };
00303 
00304         // protege atributos del agente 
00305     pthread_mutex_t mutex;
00306 
00307     Agent_State agent_state;
00308 
00309     pthread_t thread_id;
00310 
00311     Agent_Info info;
00312 
00313         // Enlace a lista de agentes de todo el grafo. Protegido por
00314         // mutex del grafo.
00315     Dlink agent_link_in_graph; 
00316 
00317         // Enlace a cola de listos o suspendidos. Protegido por mutex
00318         // del grafo.
00319     Dlink schedule_link_in_graph;
00320 
00321         // Cada nodo o arco contiene una lista de los agentes que se
00322         // encuentran en él. Este es el enlace. Aquí la protección es
00323         // delicada cuando hay cambio de localidad porque involucra
00324         // tomar el mutex de la localidad, eliminar el agente y soltar
00325         // el mutex. Luego, con la localidad destino, tomar el mutex,
00326         // insertar el agente y soltar.
00327     Dlink location_link;
00328 
00329     bool in_node; // true si agente se encuentra en nodo
00330 
00331     void * location; // puntero a nodo o arco donde se encuentra el agente 
00332 
00333     void * cookie;
00334 
00335   public:
00336 
00340     typedef void (*Transit) (Agent_Graph * /* grafo  */ , 
00341                     Agent * /* agente */ , 
00342                     void * /* cookie */);
00343     
00344     static void transit_fct(Agent_Graph *, Agent *, void*)
00345     {
00346       // empty
00347     }
00348     
00349 
00350   private:
00351 
00352     Transit transit; 
00353 
00354   public:
00355 
00357     Agent() : location(NULL), cookie(NULL) 
00358     {
00359       init_mutex(&mutex);
00360     }
00361 
00364     Agent(const Agent_Info & agent_info, Transit __transit) 
00365       : info(agent_info), location(NULL), cookie(NULL), transit(__transit)
00366     {
00367       init_mutex(&mutex);
00368     }
00369     
00370     virtual ~Agent() 
00371     {
00372       destroy_mutex(&mutex);
00373     }
00374 
00376    typedef Agent_Info Agent_Type;
00377 
00378   private:
00379 
00380     // ejecuta función transito y retorna true si el agente debe ser
00381     // post-procesado en cuyo caso (ATENCIÓN) el mutex queda tomado
00382     bool execute(Agent_Graph * graph)
00383     { 
00384       if (transit == NULL)
00385      throw std::domain_error("Transit function was not specified");
00386 
00387       {
00388      CRITICAL_SECTION(mutex);
00389 
00390      I(agent_state == Ready);
00391 
00392      thread_id   = pthread_self();
00393      agent_state = Executing;
00394       }
00395 
00396       transit(graph, this, cookie); 
00397 
00398       {
00399      CRITICAL_SECTION(mutex);
00400 
00401      if (agent_state == Executing)
00402        {
00403          agent_state = Ready; /* sólo cambiamos estado si éste no ha sido
00404                         cambiado por otra thread */
00405 
00406          return false;
00407        }
00408       
00409      critical_section.disallow_unlock();
00410 
00411      return true;
00412       }
00413     }
00414  
00415         // funciones de conversión a Agent desde un dlink 
00416     LINKNAME_TO_TYPE (Agent, agent_link_in_graph);  
00417     LINKNAME_TO_TYPE (Agent, schedule_link_in_graph);
00418     LINKNAME_TO_TYPE (Agent, location_link);
00419 
00420   public:
00421 
00422     // funciones de acceso a atributos públicos de un agente
00423 
00425     Agent_Info & get_info() 
00426     {
00427       CRITICAL_SECTION(mutex);
00428 
00429       return info;
00430     }
00431   
00433     void *& get_cookie()
00434     {
00435       CRITICAL_SECTION(mutex);
00436 
00437       return cookie;
00438     }
00439 
00442     bool is_in_node()
00443     {
00444       CRITICAL_SECTION(mutex);
00445 
00446       return in_node;
00447     }
00448 
00453     struct Critical_Section : public UseMutex
00454     {
00455       Critical_Section(Agent * agent) : UseMutex(agent->mutex) { /* empty */ }
00456     };
00457   };  
00458 
00489   class Node_to_Node_Agent : public Agent
00490   {
00491 
00492     friend class Agent_Graph;
00493 
00494   public:
00495     
00497     typedef Arc * (*Leave_Node_Fct)(Agent_Graph*, Node*, Node_to_Node_Agent*);
00498 
00500     typedef bool (*Enter_Node_Fct)(Agent_Graph*, Node*, Node_to_Node_Agent*);
00501 
00502   private:
00503 
00504     Leave_Node_Fct leave_node_fct;
00505 
00506     Enter_Node_Fct enter_node_fct;
00507 
00508     static void transit_fct(Agent_Graph * g, Agent * __agent, void*)
00509     {
00510       Node_to_Node_Agent * agent = static_cast<Node_to_Node_Agent*>(__agent);
00511 
00512       Arc * arc       = NULL;
00513       Node * tgt_node = NULL;
00514 
00515       I(agent->in_node);
00516 
00517           // Determinar nodo de residencia actual
00518       Node * curr_node = static_cast<Node*>(agent->location);
00519 
00520           // Entrar a sección crítica curr_node
00521       CRITICAL_SECTION(curr_node->mutex); 
00522 
00523       if (agent->leave_node_fct != NULL)
00524      // Modificar nodo actual y decidir arco.
00525      arc = agent->leave_node_fct(g, curr_node, agent);
00526 
00527 # ifdef nada
00528       if (arc == NULL)
00529      {
00530        g->remove_agent(agent); // eliminar agente del grafo
00531        
00532        return;
00533      }
00534 # endif
00535 
00536       g->leave_agent_from_node(agent); // sacar agente del nodo
00537 
00538       tgt_node = g->get_connected_node(arc, curr_node);
00539 
00540       int lock_status = 0;
00541 
00542       do 
00543      lock_status = pthread_mutex_trylock(&tgt_node->mutex);
00544       while (lock_status == EBUSY);
00545 
00546       if (lock_status != 0)
00547      throw std::domain_error("Mutex cannot be locked");
00548 
00549       // En este punto los mutexes de los dos nodos están tomados
00550 
00551       if (not agent->enter_node_fct(g, tgt_node, agent))
00552      {
00553        pthread_mutex_unlock(&tgt_node->mutex);
00554 
00555        g->remove_agent(agent);
00556        
00557        return;
00558      }
00559 
00560       g->enter_agent_in_node(agent, tgt_node);
00561 
00562       pthread_mutex_unlock(&tgt_node->mutex);
00563   }
00564 
00565   public:
00566 
00567     Node_to_Node_Agent() 
00568     {
00569       leave_node_fct = NULL;
00570       enter_node_fct = NULL;
00571     }
00572 
00573     Node_to_Node_Agent (const Agent_Info & agent_info,
00574                Leave_Node_Fct __leave_node_fct, 
00575                Enter_Node_Fct __enter_node_fct)
00576       : Agent(agent_info, &transit_fct)
00577     {
00578       leave_node_fct = __leave_node_fct;
00579       enter_node_fct = __enter_node_fct;
00580     }
00581   };
00582   
00583 
00584   class Node_to_Arc_Agent : public Agent
00585   {
00586 
00587   };
00588   
00589 
00590   typedef Concurrent_Graph<Node, Arc> Base_Graph;
00591 
00593   typedef typename Node::Node_Type Node_Type; 
00594 
00596   typedef typename Arc::Arc_Type Arc_Type;
00597 
00599   typedef typename Agent::Agent_Type Agent_Type;
00600 
00601 private:
00602 
00605   Dlink  agent_list; // lista de agentes
00606   size_t num_agents;
00607     
00608   Dlink ready_queue; // cola de agentes listos para procesarse
00609   size_t num_agents_ready; 
00610     
00611   Dlink suspended_queue; // cola de agentes suspendidos
00612   size_t num_agents_suspended;  
00613 
00614       // bloquea start_graph() si usuario lo solicita
00615   pthread_cond_t block_starter_cond_var; 
00616 
00617       // inserta agente por el trasero de la cola de listos. 
00618       // ATENCIÓN: No toma el mutex
00619   void insert_agent_in_ready_queue(Agent * agent)
00620   {
00621     ready_queue.append(&agent->schedule_link_in_graph);
00622     num_agents_ready++;
00623     pthread_cond_signal(&graph_cond_var);
00624   }
00625     
00626       // saca de cola de listos el agente del frente. 
00627       // ATENCIÓN: No toma el mutex
00628   Agent * remove_next_ready_agent() 
00629   {
00630     if (not ready_queue.is_empty())
00631       {
00632      Dlink * link = ready_queue.get_next();
00633 
00634      Agent * agent = Agent::schedule_link_in_graph_to_Agent(link);
00635 
00636      Agent * result = static_cast<Agent*>(agent);
00637 
00638      ready_queue.remove_next();
00639      num_agents_ready--;
00640 
00641      return result;
00642       }
00643 
00644     return NULL;
00645   }
00646     
00647       // elimina el agente de la cola de listos; nótese que se trata de
00648       // un agente arbitrario, no del que está en el frente 
00649       // ATENCIÓN: No toma el mutex
00650   void remove_agent_from_ready_queue(Agent * agent)
00651   {
00652     num_agents_ready--;
00653     agent->schedule_link_in_graph.del();
00654   }
00655    
00656       // inserta agente por el trasero de la cola de suspendidos
00657       // ATENCIÓN: No toma el mutex
00658   void insert_agent_in_suspended_queue(Agent * agent)
00659   {
00660     suspended_queue.append(&agent->schedule_link_in_graph);
00661     num_agents_suspended++; 
00662   }
00663 
00664       // elimina el agente de la cola de suspendidos; nótese que se trata de
00665       // un agente arbitrario, no del que está en el frente 
00666       // ATENCIÓN: No toma el mutex
00667   void remove_agent_from_suspended_queue(Agent * agent)
00668   {
00669     agent->schedule_link_in_graph.del();
00670     num_agents_suspended--; 
00671   }
00672 
00673 public:
00674 
00677 
00678   const size_t & get_num_agents() const 
00679   {
00680     CRITICAL_SECTION(graph_mutex);
00681 
00682     return num_agents; 
00683   }
00684 
00686   const size_t & get_num_threads() const
00687   {
00688     return num_threads;
00689   }
00690 
00698   Node * get_agent_node_location(Agent * agent) 
00699   {
00700     CRITICAL_SECTION(agent->mutex);
00701 
00702     I(not agent->location_link.is_empty());
00703 
00704     if (agent->in_node) 
00705       return static_cast<Node*>(agent->location);
00706 
00707     return NULL;
00708   }
00709 
00710 private:
00711 
00712       // inserción genérica en nodo o arco
00713       template <typename Location> inline static 
00714   void insert_agent(Agent * agent, Location * location)
00715   {
00716     agent->location = location;
00717 
00718     location->agent_list.append(&agent->location_link); 
00719   }
00720 
00721   
00722       template <typename Location> inline static 
00723   void remove_agent_from_location(Agent * agent, Location * location)
00724   {
00725     agent->location_link->del(); 
00726   }
00727 
00728 public:
00729 
00739   void leave_agent_from_node(Agent * agent)
00740   {
00741     I(agent->in_node);
00742 
00743     agent->location_link.del();
00744   }
00745 
00753   void enter_agent_in_node(Agent * agent, Node * node)
00754   {
00755     agent->in_node  = true;
00756     agent->location = node;
00757 
00758     insert_agent(agent, node);
00759   }
00760 
00761   Arc * get_agent_arc_location(Agent * agent)
00762   {
00763     CRITICAL_SECTION(agent->mutex);
00764 
00765     if (not agent->in_node) 
00766       return static_cast<Arc*>(agent->location);
00767 
00768     return NULL;
00769   }
00770 
00771       // cambia el agente a un nuevo nodo. Se asume que el mutex del nodo o
00772       // arco donde se encuentra el agente ya está tomado
00773   void set_agent_arc_location(Agent * agent, Arc * arc)
00774   {
00775     if (agent->agent_state == Agent::Suspended)
00776       throw std::domain_error("cannot set location of suspended agent");
00777 
00778     agent->location_link.del();
00779 
00780     agent->in_node = false;
00781     agent->location = arc;
00782 
00783     insert_agent(agent, arc); 
00784   }
00785 
00786   void enter_agent_in_arc(Agent *       agent, 
00787                  Arc *         arc, 
00788                  const bool &  with_mutex = false)
00789   {
00790     if (with_mutex)
00791       pthread_mutex_lock(&agent->mutex);
00792 
00793     {
00794       CRITICAL_SECTION(arc->mutex);
00795 
00796       agent->in_node  = false;
00797       agent->location = arc;
00798 
00799       insert_agent(agent, arc);
00800     }
00801 
00802     if (with_mutex)
00803       pthread_mutex_unlock(&agent->mutex);
00804   }
00805 
00806   void change_agent_arc_location(Agent * agent, Arc * arc)
00807   {
00808     CRITICAL_SECTION(agent->mutex);
00809 
00810     set_agent_arc_location(agent, arc);
00811   }
00812 
00813   bool & is_agent_in_node(Agent * agent) 
00814   {
00815     CRITICAL_SECTION(agent->mutex);
00816 
00817     return agent->in_node;
00818   }
00819 
00820   Agent * get_first_agent()
00821   {
00822     CRITICAL_SECTION(graph_mutex);
00823 
00824     if (get_num_agents() == 0)
00825       throw std::range_error("Graph has not agents ");
00826 
00827     return static_cast<Agent*>
00828       (Agent::agent_link_in_graph_to_Agent (&*agent_list.get_next()));
00829   } 
00830 
00831 private:
00832 
00833       // asume que mutex del grafo y del agente ya están tomados.
00834       // ATENCIÓN: INVOCANTE DEBE SOLTAR EL MUTEX Y LUEGO HACER DELETE 
00835   void __remove_agent_in_graph(Agent * agent)
00836   { 
00837     if (not agent->location_link.is_empty()) 
00838       {
00839      if (agent->in_node)
00840        pthread_mutex_lock(&static_cast<Node*>(agent->location)->mutex);
00841      else
00842        pthread_mutex_lock(&static_cast<Arc*>(agent->location)->mutex);
00843 
00844      agent->location_link.del();
00845 
00846      if (agent->in_node)
00847        pthread_mutex_unlock(&static_cast<Node*>(agent->location)->mutex);
00848      else
00849        pthread_mutex_unlock(&static_cast<Arc*>(agent->location)->mutex);
00850       }
00851 
00852     agent->agent_link_in_graph.del();
00853     num_agents--;
00854      
00855     agent->schedule_link_in_graph.del();
00856 
00857     switch (agent->agent_state)
00858       {
00859       case Agent::Deleting_From_Ready:
00860 
00861      num_agents_ready--; break;
00862 
00863       case Agent::Deleting_From_Suspended:
00864 
00865      num_agents_suspended--; break;
00866 
00867       default: EXIT("Invalid call (state %ld)", agent->agent_state);
00868       }
00869   }
00870 
00871 public:
00872 
00873   void remove_agent(Agent * agent)
00874   { 
00875     {
00876       CRITICAL_SECTION(agent->mutex);
00877 
00878       switch (agent->agent_state)
00879      {
00880      case Agent::Ready:
00881 
00882        agent->agent_state = Agent::Deleting_From_Ready;
00883 
00884        break; // eliminará agente después del switch
00885 
00886      case Agent::Suspended:
00887 
00888        agent->agent_state = Agent::Deleting_From_Suspended;
00889 
00890        break; // eliminará agente después del switch
00891 
00892      case Agent::Executing:
00893      case Agent::Suspending:
00894           // Eliminación será realizada por thread que está
00895           // ejecutando el agente
00896        agent->agent_state = Agent::Deleting_From_Ready; 
00897 
00898        return;
00899 
00900      case Agent::Deleting_From_Ready:
00901      case Agent::Deleting_From_Suspended:
00902 
00903        throw std::domain_error("Agent is already deleting");
00904 
00905      default: EXIT("Invalid agent state %ld", agent->agent_state);
00906      }
00907 
00908       {
00909      CRITICAL_SECTION(graph_mutex);
00910 
00911      __remove_agent_in_graph(agent);
00912       }
00913     }
00914 
00915     delete agent;
00916   }
00917 
00918 private:
00919 
00920   void create_thread(pthread_t & thread, void *( *start_routine)(void*))
00921   {
00922     int result = pthread_create(&thread, NULL, start_routine, this); 
00923 
00924     if (result != 0)
00925       {
00926      if (result == EAGAIN)
00927        throw std::bad_alloc();
00928      else
00929        throw std::exception();
00930       }
00931   }
00932 
00933       // retorna el índice dentro del arreglo threads[] de thread
00934   int search_thread(const pthread_t & thread)
00935   {
00936     return sequential_search(threads, thread, 0, num_threads - 1);
00937   }
00938 
00939   void wait_for_threads_termination()
00940   {
00941     for (int i = 0; i < num_threads; i++) 
00942       {
00943      int st = pthread_join(threads[i] , NULL);
00944 
00945      if (st != 0)
00946        EXIT("Error %d in pthread_join", st);
00947       }
00948   }
00949 
00950 public:
00951    
00952   void start_graph(const bool block_caller = true) 
00953   {
00954     CRITICAL_SECTION(graph_mutex);
00955 
00956     if (status == Running)
00957       throw std::domain_error("Graph is already running");
00958     
00959     status = Running;
00960 
00961         // crear las threads y colocarlas a ejecutar inmediatamente
00962     for (int i = 0; i < num_threads; i++) 
00963       create_thread(threads[i], agent_handler);
00964 
00965     if (block_caller)
00966       pthread_cond_wait(&block_starter_cond_var, &graph_mutex);
00967   }
00968 
00969   void stop_graph()
00970   {
00971     {
00972       CRITICAL_SECTION(graph_mutex);
00973 
00974       if (status == Stopped)
00975      throw std::domain_error("Graph is already stopped");
00976 
00977       if (status == Suspended)
00978      throw std::domain_error("Graph is suspended");
00979 
00980       status = Stopped;
00981 
00982       pthread_cond_broadcast(&graph_cond_var);
00983     }
00984 
00985     wait_for_threads_termination();
00986   }
00987 
00988   void suspend_graph()
00989   {
00990     CRITICAL_SECTION(graph_mutex);
00991 
00992     status = Suspended;
00993   }
00994 
00995   void resume_graph()
00996   {
00997     {
00998       CRITICAL_SECTION(graph_mutex);
00999 
01000       if (status != Suspended)
01001      throw std::domain_error("Graph has not been previously suspended");
01002 
01003       status = Running;
01004     }
01005     
01006     pthread_cond_broadcast(&graph_cond_var);
01007   }
01008 
01009   void clear_agent_list()
01010   {
01011     if (status == Running)
01012       suspend_graph();
01013 
01014     for (Dlink::Iterator it(&agent_list); it.has_current(); /* empty */)
01015       {
01016      Agent * agent = Agent::agent_link_in_graph_to_Agent(it.get_current());
01017 
01018         it.next();
01019         remove_agent(agent);
01020       }
01021    }
01022 
01025   const long & get_num_agents_ready() const 
01026   {
01027     CRITICAL_SECTION(graph_mutex);
01028 
01029     return num_agents_ready; 
01030   }
01031 
01032   const long & get_num_agents_suspended() const 
01033   {
01034     CRITICAL_SECTION(graph_mutex);
01035 
01036     return num_agents_suspended;
01037   }
01038 
01039 private:
01040 
01041   void __suspend_agent_in_graph(Agent * agent)
01042   {
01043     remove_agent_from_ready_queue(agent);
01044     insert_agent_in_suspended_queue(agent);
01045   }
01046 
01047 public:
01048     
01049   void suspend_agent(Agent * agent)
01050   {
01051     {     // validar que el agente se suspenda en un estado correcto
01052       CRITICAL_SECTION(agent->mutex);
01053 
01054       switch (agent->agent_state)
01055      {
01056      case Agent::Suspended:
01057 
01058        throw std::domain_error("Agent is already suspended");
01059 
01060      case Agent::Suspending:
01061 
01062        throw std::domain_error("Agent is already suspending");
01063 
01064      case Agent::Executing:
01065 
01066            /* suspención será realizada por run() luego de ejecución */
01067        agent->agent_state = Agent::Suspending; 
01068 
01069        return;
01070 
01071      case Agent::Ready: 
01072        
01073        agent->agent_state = Agent::Suspended;
01074 
01075        break;
01076 
01077      default: 
01078 
01079        EXIT("Invalid agent state %ld", agent->agent_state);
01080      }
01081     }
01082     
01083     CRITICAL_SECTION(graph_mutex);
01084 
01085     __suspend_agent_in_graph(agent);
01086   }
01087 
01088 private:
01089 
01090   void __resume_agent_in_graph(Agent * agent) 
01091   {
01092     remove_agent_from_suspended_queue(agent);
01093     insert_agent_in_ready_queue(agent);
01094   }
01095 
01096 public:
01097 
01098   void resume_agent(Agent * agent) 
01099   {
01100     {
01101       CRITICAL_SECTION(agent->mutex);
01102 
01103       switch (agent->agent_state)
01104      {
01105      case Agent::Ready: 
01106      case Agent::Executing:
01107        
01108          throw throw std::domain_error("Agent is not suspended");
01109 
01110      case Agent::Suspending:
01111      case Agent::Suspended:
01112 
01113        agent->agent_state = Agent::Ready;
01114 
01115        break;
01116 
01117      default:
01118 
01119         Exit("Invalid agent state %ld", agent->agent_state);
01120      }
01121     }
01122 
01123     CRITICAL_SECTION(graph_mutex);
01124 
01125     __resume_agent_in_graph(agent);
01126   }
01127 
01128   typedef void * (Callback)(void *);
01129 
01130 private:
01131 
01132   void * (*callback)(void *);
01133 
01134   int callback_freq;
01135 
01136   pthread_t callback_thread;
01137 
01138   void init()
01139   {
01140     threads = new pthread_t[num_threads];
01141 
01142     init_mutex(graph_mutex);
01143     pthread_cond_init(&graph_cond_var, NULL);
01144     pthread_cond_init(&block_starter_cond_var, NULL); 
01145   }
01146 
01147 public:
01148 
01149   Agent_Graph(const size_t & num_threads = 1) 
01150     : num_threads(num_threads), status(Init),
01151       num_agents(0), num_agents_ready(0), num_agents_suspended(0), 
01152       callback(NULL)
01153    {
01154      init();
01155    }
01156 
01157   virtual ~Agent_Graph()
01158   {
01159     {
01160       CRITICAL_SECTION(graph_mutex);
01161 
01162       if (status == Running)
01163      {
01164        status = Stopped;
01165 
01166        wait_for_threads_termination();
01167      }
01168     }
01169 
01170     clear_agent_list();
01171     destroy_mutex(graph_mutex);
01172     pthread_cond_destroy(&graph_cond_var);
01173     pthread_cond_destroy(&block_starter_cond_var);
01174 
01175     delete [] threads; 
01176   } 
01177 
01178   Agent_Graph(const Agent_Graph & g)
01179     : Base_Graph(g),
01180       num_threads(g.num_threads), status(Init),
01181       num_agents(0), num_agents_ready(), num_agents_suspended(0), 
01182       callback(g.callback)
01183    {
01184      init(); 
01185    }
01186 
01187   Agent_Graph & operator = (const Agent_Graph & g)
01188   {
01189     if (this == &g)
01190       return *this;
01191 
01192     clear_agent_list();
01193 
01194     copy_graph(*this, const_cast<Agent_Graph&>(g));
01195   }
01196 
01197       template <typename Location> 
01198   Agent * insert_agent_in_location(Agent *      agent,
01199                        Location *   location, 
01200                        const bool & suspended = false)
01201   {
01202         // puesto que agent debe ser "nuevo", sus datos no están manejados
01203         // por otra thread y por lo tanto no tienen necesidad de protegerse
01204     agent->agent_state = suspended ? Agent::Suspended : Agent::Ready;
01205       
01206     agent_list.append(&agent->agent_link_in_graph);
01207 
01208     insert_agent(agent, location);
01209 
01210     {
01211       CRITICAL_SECTION(graph_mutex);
01212 
01213       num_agents++;
01214 
01215       if (not suspended)
01216      insert_agent_in_ready_queue(agent);
01217       else
01218      insert_agent_in_suspended_queue(agent);
01219     }
01220 
01221     return agent;
01222   }
01223 
01224       template <typename Location> Agent * 
01225   create_agent_in_location(const Agent_Info &      agent_data, 
01226                   typename Agent::Transit fnct,
01227                   Location *              location, 
01228                   const bool &            suspended = false)
01229   {
01230         // puesto que esta función crea el agente y determina su
01231         // dirección, no es necesario proteger sus atributos con el
01232         // mutex
01233     Agent * agent = new Agent;
01234 
01235     agent->info    = agent_data;
01236     agent->transit = fnct;
01237 
01238     return insert_agent_in_location(agent, location, suspended);
01239   }
01240 
01241   Agent * create_agent_in_node(const Agent_Info &      agent_data, 
01242                       typename Agent::Transit fnct,
01243                       Node *                  node, 
01244                       const bool &            suspended = false)
01245   {
01246     return create_agent_in_location(agent_data, fnct, node, suspended);
01247   }
01248     
01249   Agent * create_agent_in_arc(const Agent_Info &      agent_data, 
01250                      typename Agent::Transit fnct,
01251                      Arc *                   arc,
01252                      const bool &            suspended = false)
01253   {
01254     return create_agent_in_location(agent_data, fnct, arc, suspended);
01255   }
01256 
01257       template <class Agent_Class>
01258   Agent_Class * insert_agent_in_node(Agent_Class * agent, 
01259                          Node *        node, 
01260                          const bool &  suspended = false)
01261   {
01262     Agent * ret_val = insert_agent_in_location(agent, node, suspended);
01263 
01264     return static_cast<Agent_Class*>(ret_val);
01265   }
01266     
01267     
01268 
01269   // TODO: inserción sin función de transición
01270 
01271       template <class Equal>
01272   Agent * search_agent(const Agent_Type & agent_data)
01273   {
01274     CRITICAL_SECTION(graph_mutex);
01275 
01276     for (Dlink::Iterator it(agent_list); it.has_current(); it.next()) 
01277       {
01278         Agent * agent = static_cast<Agent*>
01279        (Agent::agent_link_in_graph_to_Agent(it.get_current()));
01280 
01281         if (Equal () (agent->get_info(), agent_data)) 
01282           return agent;
01283       }
01284       
01285     return NULL;
01286   }
01287    
01288   Agent * search_agent(const Agent_Type & agent)
01289   {
01290     return search_agent<Aleph::equal_to<Agent_Type> >(agent);
01291   }
01292 
01293   int get_status()
01294   {
01295     CRITICAL_SECTION(graph_mutex);
01296 
01297     return status;
01298   }
01299 
01300 private:
01301 
01302   static void * callback_handler(void * cookie)
01303   {
01304     Agent_Graph * graph = static_cast<Agent_Graph*>(cookie);
01305 
01306     while (true)
01307       {
01308      CRITICAL_SECTION(graph->graph_mutex);
01309 
01310      switch (graph->status)
01311        {
01312        case Stopped: return NULL; // terminar thread
01313 
01314        case Suspended: // suspender la thread con variable condición
01315          pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
01316          break;
01317          
01318        case Running:
01319 
01320          sleep(graph->callback_freq); // esperar 
01321 
01322          (*graph->callback)(cookie);  // invocar callback
01323 
01324          break; // case Running:
01325 
01326        default: EXIT("Invalid execution state %ld", graph->status);
01327        }
01328       }
01329   }
01330 
01331 public:
01332 
01333   void set_callback(Callback __callback, const int & freq = 2000)
01334   {
01335     CRITICAL_SECTION(graph_mutex);
01336 
01337     I(__callback != NULL);
01338 
01339     callback      = __callback;
01340     callback_freq = freq;
01341 
01342     create_thread(callback_thread, callback_handler); 
01343   }
01344 
01345   void cancel_callback()
01346   {
01347     CRITICAL_SECTION(graph_mutex);
01348 
01349     if (callback == NULL)
01350       throw std::domain_error("callback has not been set");
01351 
01352     pthread_cancel(callback_thread);
01353   }
01354 
01355 private:
01356 
01357   static void * agent_handler(void * cookie) 
01358   {
01359     Agent_Graph * graph = static_cast<Agent_Graph*>(cookie);
01360 
01361     while (true)
01362       {
01363      CRITICAL_SECTION(graph->graph_mutex);
01364 
01365      switch (graph->status) 
01366        {
01367        case Stopped: return NULL; // terminar thread
01368 
01369        case Suspended: // suspender la thread con variable condición
01370          pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
01371          break;
01372          
01373        case Running: 
01374          {
01375            Agent * ready_agent = graph->remove_next_ready_agent();
01376        
01377            if (ready_agent == NULL) 
01378           { // si no hay agente se señaliza a start_graph() y
01379             // se suspende thread  
01380             pthread_cond_signal(&graph->block_starter_cond_var);
01381 
01382             pthread_cond_wait(&graph->graph_cond_var, 
01383                         &graph->graph_mutex);
01384           }
01385            else
01386           {      // libera mutex grafo antes ejecutar agente
01387             critical_section.unlock(); 
01388 
01389             if (not ready_agent->execute(graph)) // ejecuta agente
01390               {
01391                 critical_section.lock(); // retoma el mutex del grafo
01392             
01393                 graph->insert_agent_in_ready_queue(ready_agent);
01394               }
01395             else
01396               {
01397                 critical_section.lock(); // retoma el mutex del grafo
01398 
01399                 // Revisar el estado del agente y post-procesarlo
01400                 // mutex del agente sigue tomado pues se leerá estado
01401                 switch (ready_agent->agent_state) 
01402                {
01403                case Agent::Suspending:
01404                  // libera mutex de agente pues ya leimos estado
01405                  pthread_mutex_unlock(&ready_agent->mutex);
01406 
01407                  graph->__suspend_agent_in_graph(ready_agent);
01408                
01409                  break;
01410                
01411                case Agent::Deleting_From_Ready:
01412                case Agent::Deleting_From_Suspended:
01413 
01414                  pthread_mutex_unlock(&ready_agent->mutex);
01415 
01416                  graph->__remove_agent_in_graph(ready_agent);
01417 
01418                  delete ready_agent;
01419 
01420                  break;
01421 
01422                default: EXIT("Invalid agent state %ld in run()",
01423                           ready_agent->agent_state);
01424                }
01425               } // end else of if (not ready_agent->execute(graph))
01426           }
01427            break; // case Running:
01428          }
01429      default: EXIT("Invalid execution state %ld", graph->status);
01430      }
01431       } // end while (true)
01432   }
01433 };  // end Agent_Graph 
01434 
01435 
01436 }
01437 
01438 # endif 

Leandro R. León