00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 # ifndef TPL_AGENT_GRAPH_H
00056 # define TPL_AGENT_GRAPH_H
00057
00058 # include <iostream>
00059 # include <exception>
00060 # include <sys/types.h>
00061 # include <dlink.H>
00062 # include <tpl_concurrent_graph.H>
00063 # include <tpl_dynArray.H>
00064 # include <tpl_arrayQueue.H>
00065
00066 using namespace Aleph;
00067 using namespace std;
00068
00069 namespace Aleph
00070 {
00071
00072
00073
00074
00075 template <typename Agent_Info> class Agent;
00076
00077 template <typename Node_Info> class Agent_Node;
00078
00079
00084 template <typename Agents_Node_Info>
00085 class Agent_Node : public Concurrent_Node <Agents_Node_Info>
00086 {
00087
00088 public:
00089
00090 Dlink agent_list;
00091
00092 typedef Concurrent_Node<Agents_Node_Info> Base_Node;
00093
00094 typedef Agents_Node_Info Node_Type;
00095
00096 Agent_Node() : Base_Node() { }
00097
00098 Agent_Node(const Node_Type & info) : Base_Node(info) { }
00099
00100 virtual ~Agent_Node(){ }
00101 };
00102
00103
00108 template <typename Agents_Arc_Info>
00109 class Agent_Arc : public Concurrent_Arc<Agents_Arc_Info>
00110 {
00111
00112 public:
00113
00114 Dlink agent_list;
00115
00116 typedef Concurrent_Arc<Agents_Arc_Info> Base_Arc;
00117
00118 typedef Agents_Arc_Info Arc_Type;
00119
00120 Agent_Arc() : Base_Arc() { }
00121
00122 virtual ~Agent_Arc() { }
00123 };
00124
00125
00188 template <typename __Node,
00189 typename __Arc,
00190 typename Agent_Info
00191 >
00192 class Agent_Graph : public Concurrent_Graph<__Node, __Arc>
00193 {
00194 public:
00195
00197 typedef __Node Node;
00198
00200 typedef __Arc Arc;
00201
00203 typedef Agent_Graph<__Node, __Arc, Agent_Info> graph_type;
00204
00205 private:
00206
00207
00208 enum Status {
00209 Init,
00210 Running,
00211 Suspended,
00212 Stopped
00213
00214 };
00215
00216 const size_t num_threads;
00217 pthread_t * threads;
00218 Status status;
00219
00220
00221 pthread_mutex_t graph_mutex;
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 pthread_cond_t graph_cond_var;
00238
00239 public:
00240
00289 class Agent
00290 {
00291
00292 friend class Agent_Graph;
00293
00294 enum Agent_State
00295 {
00296 Ready,
00297 Executing,
00298 Suspending,
00299 Suspended,
00300 Deleting_From_Ready,
00301 Deleting_From_Suspended
00302 };
00303
00304
00305 pthread_mutex_t mutex;
00306
00307 Agent_State agent_state;
00308
00309 pthread_t thread_id;
00310
00311 Agent_Info info;
00312
00313
00314
00315 Dlink agent_link_in_graph;
00316
00317
00318
00319 Dlink schedule_link_in_graph;
00320
00321
00322
00323
00324
00325
00326
00327 Dlink location_link;
00328
00329 bool in_node;
00330
00331 void * location;
00332
00333 void * cookie;
00334
00335 public:
00336
00340 typedef void (*Transit) (Agent_Graph * ,
00341 Agent * ,
00342 void * );
00343
00344 static void transit_fct(Agent_Graph *, Agent *, void*)
00345 {
00346
00347 }
00348
00349
00350 private:
00351
00352 Transit transit;
00353
00354 public:
00355
00357 Agent() : location(NULL), cookie(NULL)
00358 {
00359 init_mutex(&mutex);
00360 }
00361
00364 Agent(const Agent_Info & agent_info, Transit __transit)
00365 : info(agent_info), location(NULL), cookie(NULL), transit(__transit)
00366 {
00367 init_mutex(&mutex);
00368 }
00369
00370 virtual ~Agent()
00371 {
00372 destroy_mutex(&mutex);
00373 }
00374
00376 typedef Agent_Info Agent_Type;
00377
00378 private:
00379
00380
00381
00382 bool execute(Agent_Graph * graph)
00383 {
00384 if (transit == NULL)
00385 throw std::domain_error("Transit function was not specified");
00386
00387 {
00388 CRITICAL_SECTION(mutex);
00389
00390 I(agent_state == Ready);
00391
00392 thread_id = pthread_self();
00393 agent_state = Executing;
00394 }
00395
00396 transit(graph, this, cookie);
00397
00398 {
00399 CRITICAL_SECTION(mutex);
00400
00401 if (agent_state == Executing)
00402 {
00403 agent_state = Ready;
00404
00405
00406 return false;
00407 }
00408
00409 critical_section.disallow_unlock();
00410
00411 return true;
00412 }
00413 }
00414
00415
00416 LINKNAME_TO_TYPE (Agent, agent_link_in_graph);
00417 LINKNAME_TO_TYPE (Agent, schedule_link_in_graph);
00418 LINKNAME_TO_TYPE (Agent, location_link);
00419
00420 public:
00421
00422
00423
00425 Agent_Info & get_info()
00426 {
00427 CRITICAL_SECTION(mutex);
00428
00429 return info;
00430 }
00431
00433 void *& get_cookie()
00434 {
00435 CRITICAL_SECTION(mutex);
00436
00437 return cookie;
00438 }
00439
00442 bool is_in_node()
00443 {
00444 CRITICAL_SECTION(mutex);
00445
00446 return in_node;
00447 }
00448
00453 struct Critical_Section : public UseMutex
00454 {
00455 Critical_Section(Agent * agent) : UseMutex(agent->mutex) { }
00456 };
00457 };
00458
00489 class Node_to_Node_Agent : public Agent
00490 {
00491
00492 friend class Agent_Graph;
00493
00494 public:
00495
00497 typedef Arc * (*Leave_Node_Fct)(Agent_Graph*, Node*, Node_to_Node_Agent*);
00498
00500 typedef bool (*Enter_Node_Fct)(Agent_Graph*, Node*, Node_to_Node_Agent*);
00501
00502 private:
00503
00504 Leave_Node_Fct leave_node_fct;
00505
00506 Enter_Node_Fct enter_node_fct;
00507
00508 static void transit_fct(Agent_Graph * g, Agent * __agent, void*)
00509 {
00510 Node_to_Node_Agent * agent = static_cast<Node_to_Node_Agent*>(__agent);
00511
00512 Arc * arc = NULL;
00513 Node * tgt_node = NULL;
00514
00515 I(agent->in_node);
00516
00517
00518 Node * curr_node = static_cast<Node*>(agent->location);
00519
00520
00521 CRITICAL_SECTION(curr_node->mutex);
00522
00523 if (agent->leave_node_fct != NULL)
00524
00525 arc = agent->leave_node_fct(g, curr_node, agent);
00526
00527 # ifdef nada
00528 if (arc == NULL)
00529 {
00530 g->remove_agent(agent);
00531
00532 return;
00533 }
00534 # endif
00535
00536 g->leave_agent_from_node(agent);
00537
00538 tgt_node = g->get_connected_node(arc, curr_node);
00539
00540 int lock_status = 0;
00541
00542 do
00543 lock_status = pthread_mutex_trylock(&tgt_node->mutex);
00544 while (lock_status == EBUSY);
00545
00546 if (lock_status != 0)
00547 throw std::domain_error("Mutex cannot be locked");
00548
00549
00550
00551 if (not agent->enter_node_fct(g, tgt_node, agent))
00552 {
00553 pthread_mutex_unlock(&tgt_node->mutex);
00554
00555 g->remove_agent(agent);
00556
00557 return;
00558 }
00559
00560 g->enter_agent_in_node(agent, tgt_node);
00561
00562 pthread_mutex_unlock(&tgt_node->mutex);
00563 }
00564
00565 public:
00566
00567 Node_to_Node_Agent()
00568 {
00569 leave_node_fct = NULL;
00570 enter_node_fct = NULL;
00571 }
00572
00573 Node_to_Node_Agent (const Agent_Info & agent_info,
00574 Leave_Node_Fct __leave_node_fct,
00575 Enter_Node_Fct __enter_node_fct)
00576 : Agent(agent_info, &transit_fct)
00577 {
00578 leave_node_fct = __leave_node_fct;
00579 enter_node_fct = __enter_node_fct;
00580 }
00581 };
00582
00583
00584 class Node_to_Arc_Agent : public Agent
00585 {
00586
00587 };
00588
00589
00590 typedef Concurrent_Graph<Node, Arc> Base_Graph;
00591
00593 typedef typename Node::Node_Type Node_Type;
00594
00596 typedef typename Arc::Arc_Type Arc_Type;
00597
00599 typedef typename Agent::Agent_Type Agent_Type;
00600
00601 private:
00602
00605 Dlink agent_list;
00606 size_t num_agents;
00607
00608 Dlink ready_queue;
00609 size_t num_agents_ready;
00610
00611 Dlink suspended_queue;
00612 size_t num_agents_suspended;
00613
00614
00615 pthread_cond_t block_starter_cond_var;
00616
00617
00618
00619 void insert_agent_in_ready_queue(Agent * agent)
00620 {
00621 ready_queue.append(&agent->schedule_link_in_graph);
00622 num_agents_ready++;
00623 pthread_cond_signal(&graph_cond_var);
00624 }
00625
00626
00627
00628 Agent * remove_next_ready_agent()
00629 {
00630 if (not ready_queue.is_empty())
00631 {
00632 Dlink * link = ready_queue.get_next();
00633
00634 Agent * agent = Agent::schedule_link_in_graph_to_Agent(link);
00635
00636 Agent * result = static_cast<Agent*>(agent);
00637
00638 ready_queue.remove_next();
00639 num_agents_ready--;
00640
00641 return result;
00642 }
00643
00644 return NULL;
00645 }
00646
00647
00648
00649
00650 void remove_agent_from_ready_queue(Agent * agent)
00651 {
00652 num_agents_ready--;
00653 agent->schedule_link_in_graph.del();
00654 }
00655
00656
00657
00658 void insert_agent_in_suspended_queue(Agent * agent)
00659 {
00660 suspended_queue.append(&agent->schedule_link_in_graph);
00661 num_agents_suspended++;
00662 }
00663
00664
00665
00666
00667 void remove_agent_from_suspended_queue(Agent * agent)
00668 {
00669 agent->schedule_link_in_graph.del();
00670 num_agents_suspended--;
00671 }
00672
00673 public:
00674
00677
00678 const size_t & get_num_agents() const
00679 {
00680 CRITICAL_SECTION(graph_mutex);
00681
00682 return num_agents;
00683 }
00684
00686 const size_t & get_num_threads() const
00687 {
00688 return num_threads;
00689 }
00690
00698 Node * get_agent_node_location(Agent * agent)
00699 {
00700 CRITICAL_SECTION(agent->mutex);
00701
00702 I(not agent->location_link.is_empty());
00703
00704 if (agent->in_node)
00705 return static_cast<Node*>(agent->location);
00706
00707 return NULL;
00708 }
00709
00710 private:
00711
00712
00713 template <typename Location> inline static
00714 void insert_agent(Agent * agent, Location * location)
00715 {
00716 agent->location = location;
00717
00718 location->agent_list.append(&agent->location_link);
00719 }
00720
00721
00722 template <typename Location> inline static
00723 void remove_agent_from_location(Agent * agent, Location * location)
00724 {
00725 agent->location_link->del();
00726 }
00727
00728 public:
00729
00739 void leave_agent_from_node(Agent * agent)
00740 {
00741 I(agent->in_node);
00742
00743 agent->location_link.del();
00744 }
00745
00753 void enter_agent_in_node(Agent * agent, Node * node)
00754 {
00755 agent->in_node = true;
00756 agent->location = node;
00757
00758 insert_agent(agent, node);
00759 }
00760
00761 Arc * get_agent_arc_location(Agent * agent)
00762 {
00763 CRITICAL_SECTION(agent->mutex);
00764
00765 if (not agent->in_node)
00766 return static_cast<Arc*>(agent->location);
00767
00768 return NULL;
00769 }
00770
00771
00772
00773 void set_agent_arc_location(Agent * agent, Arc * arc)
00774 {
00775 if (agent->agent_state == Agent::Suspended)
00776 throw std::domain_error("cannot set location of suspended agent");
00777
00778 agent->location_link.del();
00779
00780 agent->in_node = false;
00781 agent->location = arc;
00782
00783 insert_agent(agent, arc);
00784 }
00785
00786 void enter_agent_in_arc(Agent * agent,
00787 Arc * arc,
00788 const bool & with_mutex = false)
00789 {
00790 if (with_mutex)
00791 pthread_mutex_lock(&agent->mutex);
00792
00793 {
00794 CRITICAL_SECTION(arc->mutex);
00795
00796 agent->in_node = false;
00797 agent->location = arc;
00798
00799 insert_agent(agent, arc);
00800 }
00801
00802 if (with_mutex)
00803 pthread_mutex_unlock(&agent->mutex);
00804 }
00805
00806 void change_agent_arc_location(Agent * agent, Arc * arc)
00807 {
00808 CRITICAL_SECTION(agent->mutex);
00809
00810 set_agent_arc_location(agent, arc);
00811 }
00812
00813 bool & is_agent_in_node(Agent * agent)
00814 {
00815 CRITICAL_SECTION(agent->mutex);
00816
00817 return agent->in_node;
00818 }
00819
00820 Agent * get_first_agent()
00821 {
00822 CRITICAL_SECTION(graph_mutex);
00823
00824 if (get_num_agents() == 0)
00825 throw std::range_error("Graph has not agents ");
00826
00827 return static_cast<Agent*>
00828 (Agent::agent_link_in_graph_to_Agent (&*agent_list.get_next()));
00829 }
00830
00831 private:
00832
00833
00834
00835 void __remove_agent_in_graph(Agent * agent)
00836 {
00837 if (not agent->location_link.is_empty())
00838 {
00839 if (agent->in_node)
00840 pthread_mutex_lock(&static_cast<Node*>(agent->location)->mutex);
00841 else
00842 pthread_mutex_lock(&static_cast<Arc*>(agent->location)->mutex);
00843
00844 agent->location_link.del();
00845
00846 if (agent->in_node)
00847 pthread_mutex_unlock(&static_cast<Node*>(agent->location)->mutex);
00848 else
00849 pthread_mutex_unlock(&static_cast<Arc*>(agent->location)->mutex);
00850 }
00851
00852 agent->agent_link_in_graph.del();
00853 num_agents--;
00854
00855 agent->schedule_link_in_graph.del();
00856
00857 switch (agent->agent_state)
00858 {
00859 case Agent::Deleting_From_Ready:
00860
00861 num_agents_ready--; break;
00862
00863 case Agent::Deleting_From_Suspended:
00864
00865 num_agents_suspended--; break;
00866
00867 default: EXIT("Invalid call (state %ld)", agent->agent_state);
00868 }
00869 }
00870
00871 public:
00872
00873 void remove_agent(Agent * agent)
00874 {
00875 {
00876 CRITICAL_SECTION(agent->mutex);
00877
00878 switch (agent->agent_state)
00879 {
00880 case Agent::Ready:
00881
00882 agent->agent_state = Agent::Deleting_From_Ready;
00883
00884 break;
00885
00886 case Agent::Suspended:
00887
00888 agent->agent_state = Agent::Deleting_From_Suspended;
00889
00890 break;
00891
00892 case Agent::Executing:
00893 case Agent::Suspending:
00894
00895
00896 agent->agent_state = Agent::Deleting_From_Ready;
00897
00898 return;
00899
00900 case Agent::Deleting_From_Ready:
00901 case Agent::Deleting_From_Suspended:
00902
00903 throw std::domain_error("Agent is already deleting");
00904
00905 default: EXIT("Invalid agent state %ld", agent->agent_state);
00906 }
00907
00908 {
00909 CRITICAL_SECTION(graph_mutex);
00910
00911 __remove_agent_in_graph(agent);
00912 }
00913 }
00914
00915 delete agent;
00916 }
00917
00918 private:
00919
00920 void create_thread(pthread_t & thread, void *( *start_routine)(void*))
00921 {
00922 int result = pthread_create(&thread, NULL, start_routine, this);
00923
00924 if (result != 0)
00925 {
00926 if (result == EAGAIN)
00927 throw std::bad_alloc();
00928 else
00929 throw std::exception();
00930 }
00931 }
00932
00933
00934 int search_thread(const pthread_t & thread)
00935 {
00936 return sequential_search(threads, thread, 0, num_threads - 1);
00937 }
00938
00939 void wait_for_threads_termination()
00940 {
00941 for (int i = 0; i < num_threads; i++)
00942 {
00943 int st = pthread_join(threads[i] , NULL);
00944
00945 if (st != 0)
00946 EXIT("Error %d in pthread_join", st);
00947 }
00948 }
00949
00950 public:
00951
00952 void start_graph(const bool block_caller = true)
00953 {
00954 CRITICAL_SECTION(graph_mutex);
00955
00956 if (status == Running)
00957 throw std::domain_error("Graph is already running");
00958
00959 status = Running;
00960
00961
00962 for (int i = 0; i < num_threads; i++)
00963 create_thread(threads[i], agent_handler);
00964
00965 if (block_caller)
00966 pthread_cond_wait(&block_starter_cond_var, &graph_mutex);
00967 }
00968
00969 void stop_graph()
00970 {
00971 {
00972 CRITICAL_SECTION(graph_mutex);
00973
00974 if (status == Stopped)
00975 throw std::domain_error("Graph is already stopped");
00976
00977 if (status == Suspended)
00978 throw std::domain_error("Graph is suspended");
00979
00980 status = Stopped;
00981
00982 pthread_cond_broadcast(&graph_cond_var);
00983 }
00984
00985 wait_for_threads_termination();
00986 }
00987
00988 void suspend_graph()
00989 {
00990 CRITICAL_SECTION(graph_mutex);
00991
00992 status = Suspended;
00993 }
00994
00995 void resume_graph()
00996 {
00997 {
00998 CRITICAL_SECTION(graph_mutex);
00999
01000 if (status != Suspended)
01001 throw std::domain_error("Graph has not been previously suspended");
01002
01003 status = Running;
01004 }
01005
01006 pthread_cond_broadcast(&graph_cond_var);
01007 }
01008
01009 void clear_agent_list()
01010 {
01011 if (status == Running)
01012 suspend_graph();
01013
01014 for (Dlink::Iterator it(&agent_list); it.has_current(); )
01015 {
01016 Agent * agent = Agent::agent_link_in_graph_to_Agent(it.get_current());
01017
01018 it.next();
01019 remove_agent(agent);
01020 }
01021 }
01022
01025 const long & get_num_agents_ready() const
01026 {
01027 CRITICAL_SECTION(graph_mutex);
01028
01029 return num_agents_ready;
01030 }
01031
01032 const long & get_num_agents_suspended() const
01033 {
01034 CRITICAL_SECTION(graph_mutex);
01035
01036 return num_agents_suspended;
01037 }
01038
01039 private:
01040
01041 void __suspend_agent_in_graph(Agent * agent)
01042 {
01043 remove_agent_from_ready_queue(agent);
01044 insert_agent_in_suspended_queue(agent);
01045 }
01046
01047 public:
01048
01049 void suspend_agent(Agent * agent)
01050 {
01051 {
01052 CRITICAL_SECTION(agent->mutex);
01053
01054 switch (agent->agent_state)
01055 {
01056 case Agent::Suspended:
01057
01058 throw std::domain_error("Agent is already suspended");
01059
01060 case Agent::Suspending:
01061
01062 throw std::domain_error("Agent is already suspending");
01063
01064 case Agent::Executing:
01065
01066
01067 agent->agent_state = Agent::Suspending;
01068
01069 return;
01070
01071 case Agent::Ready:
01072
01073 agent->agent_state = Agent::Suspended;
01074
01075 break;
01076
01077 default:
01078
01079 EXIT("Invalid agent state %ld", agent->agent_state);
01080 }
01081 }
01082
01083 CRITICAL_SECTION(graph_mutex);
01084
01085 __suspend_agent_in_graph(agent);
01086 }
01087
01088 private:
01089
01090 void __resume_agent_in_graph(Agent * agent)
01091 {
01092 remove_agent_from_suspended_queue(agent);
01093 insert_agent_in_ready_queue(agent);
01094 }
01095
01096 public:
01097
01098 void resume_agent(Agent * agent)
01099 {
01100 {
01101 CRITICAL_SECTION(agent->mutex);
01102
01103 switch (agent->agent_state)
01104 {
01105 case Agent::Ready:
01106 case Agent::Executing:
01107
01108 throw throw std::domain_error("Agent is not suspended");
01109
01110 case Agent::Suspending:
01111 case Agent::Suspended:
01112
01113 agent->agent_state = Agent::Ready;
01114
01115 break;
01116
01117 default:
01118
01119 Exit("Invalid agent state %ld", agent->agent_state);
01120 }
01121 }
01122
01123 CRITICAL_SECTION(graph_mutex);
01124
01125 __resume_agent_in_graph(agent);
01126 }
01127
01128 typedef void * (Callback)(void *);
01129
01130 private:
01131
01132 void * (*callback)(void *);
01133
01134 int callback_freq;
01135
01136 pthread_t callback_thread;
01137
01138 void init()
01139 {
01140 threads = new pthread_t[num_threads];
01141
01142 init_mutex(graph_mutex);
01143 pthread_cond_init(&graph_cond_var, NULL);
01144 pthread_cond_init(&block_starter_cond_var, NULL);
01145 }
01146
01147 public:
01148
01149 Agent_Graph(const size_t & num_threads = 1)
01150 : num_threads(num_threads), status(Init),
01151 num_agents(0), num_agents_ready(0), num_agents_suspended(0),
01152 callback(NULL)
01153 {
01154 init();
01155 }
01156
01157 virtual ~Agent_Graph()
01158 {
01159 {
01160 CRITICAL_SECTION(graph_mutex);
01161
01162 if (status == Running)
01163 {
01164 status = Stopped;
01165
01166 wait_for_threads_termination();
01167 }
01168 }
01169
01170 clear_agent_list();
01171 destroy_mutex(graph_mutex);
01172 pthread_cond_destroy(&graph_cond_var);
01173 pthread_cond_destroy(&block_starter_cond_var);
01174
01175 delete [] threads;
01176 }
01177
01178 Agent_Graph(const Agent_Graph & g)
01179 : Base_Graph(g),
01180 num_threads(g.num_threads), status(Init),
01181 num_agents(0), num_agents_ready(), num_agents_suspended(0),
01182 callback(g.callback)
01183 {
01184 init();
01185 }
01186
01187 Agent_Graph & operator = (const Agent_Graph & g)
01188 {
01189 if (this == &g)
01190 return *this;
01191
01192 clear_agent_list();
01193
01194 copy_graph(*this, const_cast<Agent_Graph&>(g));
01195 }
01196
01197 template <typename Location>
01198 Agent * insert_agent_in_location(Agent * agent,
01199 Location * location,
01200 const bool & suspended = false)
01201 {
01202
01203
01204 agent->agent_state = suspended ? Agent::Suspended : Agent::Ready;
01205
01206 agent_list.append(&agent->agent_link_in_graph);
01207
01208 insert_agent(agent, location);
01209
01210 {
01211 CRITICAL_SECTION(graph_mutex);
01212
01213 num_agents++;
01214
01215 if (not suspended)
01216 insert_agent_in_ready_queue(agent);
01217 else
01218 insert_agent_in_suspended_queue(agent);
01219 }
01220
01221 return agent;
01222 }
01223
01224 template <typename Location> Agent *
01225 create_agent_in_location(const Agent_Info & agent_data,
01226 typename Agent::Transit fnct,
01227 Location * location,
01228 const bool & suspended = false)
01229 {
01230
01231
01232
01233 Agent * agent = new Agent;
01234
01235 agent->info = agent_data;
01236 agent->transit = fnct;
01237
01238 return insert_agent_in_location(agent, location, suspended);
01239 }
01240
01241 Agent * create_agent_in_node(const Agent_Info & agent_data,
01242 typename Agent::Transit fnct,
01243 Node * node,
01244 const bool & suspended = false)
01245 {
01246 return create_agent_in_location(agent_data, fnct, node, suspended);
01247 }
01248
01249 Agent * create_agent_in_arc(const Agent_Info & agent_data,
01250 typename Agent::Transit fnct,
01251 Arc * arc,
01252 const bool & suspended = false)
01253 {
01254 return create_agent_in_location(agent_data, fnct, arc, suspended);
01255 }
01256
01257 template <class Agent_Class>
01258 Agent_Class * insert_agent_in_node(Agent_Class * agent,
01259 Node * node,
01260 const bool & suspended = false)
01261 {
01262 Agent * ret_val = insert_agent_in_location(agent, node, suspended);
01263
01264 return static_cast<Agent_Class*>(ret_val);
01265 }
01266
01267
01268
01269
01270
01271 template <class Equal>
01272 Agent * search_agent(const Agent_Type & agent_data)
01273 {
01274 CRITICAL_SECTION(graph_mutex);
01275
01276 for (Dlink::Iterator it(agent_list); it.has_current(); it.next())
01277 {
01278 Agent * agent = static_cast<Agent*>
01279 (Agent::agent_link_in_graph_to_Agent(it.get_current()));
01280
01281 if (Equal () (agent->get_info(), agent_data))
01282 return agent;
01283 }
01284
01285 return NULL;
01286 }
01287
01288 Agent * search_agent(const Agent_Type & agent)
01289 {
01290 return search_agent<Aleph::equal_to<Agent_Type> >(agent);
01291 }
01292
01293 int get_status()
01294 {
01295 CRITICAL_SECTION(graph_mutex);
01296
01297 return status;
01298 }
01299
01300 private:
01301
01302 static void * callback_handler(void * cookie)
01303 {
01304 Agent_Graph * graph = static_cast<Agent_Graph*>(cookie);
01305
01306 while (true)
01307 {
01308 CRITICAL_SECTION(graph->graph_mutex);
01309
01310 switch (graph->status)
01311 {
01312 case Stopped: return NULL;
01313
01314 case Suspended:
01315 pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
01316 break;
01317
01318 case Running:
01319
01320 sleep(graph->callback_freq);
01321
01322 (*graph->callback)(cookie);
01323
01324 break;
01325
01326 default: EXIT("Invalid execution state %ld", graph->status);
01327 }
01328 }
01329 }
01330
01331 public:
01332
01333 void set_callback(Callback __callback, const int & freq = 2000)
01334 {
01335 CRITICAL_SECTION(graph_mutex);
01336
01337 I(__callback != NULL);
01338
01339 callback = __callback;
01340 callback_freq = freq;
01341
01342 create_thread(callback_thread, callback_handler);
01343 }
01344
01345 void cancel_callback()
01346 {
01347 CRITICAL_SECTION(graph_mutex);
01348
01349 if (callback == NULL)
01350 throw std::domain_error("callback has not been set");
01351
01352 pthread_cancel(callback_thread);
01353 }
01354
01355 private:
01356
01357 static void * agent_handler(void * cookie)
01358 {
01359 Agent_Graph * graph = static_cast<Agent_Graph*>(cookie);
01360
01361 while (true)
01362 {
01363 CRITICAL_SECTION(graph->graph_mutex);
01364
01365 switch (graph->status)
01366 {
01367 case Stopped: return NULL;
01368
01369 case Suspended:
01370 pthread_cond_wait(&graph->graph_cond_var, &graph->graph_mutex);
01371 break;
01372
01373 case Running:
01374 {
01375 Agent * ready_agent = graph->remove_next_ready_agent();
01376
01377 if (ready_agent == NULL)
01378 {
01379
01380 pthread_cond_signal(&graph->block_starter_cond_var);
01381
01382 pthread_cond_wait(&graph->graph_cond_var,
01383 &graph->graph_mutex);
01384 }
01385 else
01386 {
01387 critical_section.unlock();
01388
01389 if (not ready_agent->execute(graph))
01390 {
01391 critical_section.lock();
01392
01393 graph->insert_agent_in_ready_queue(ready_agent);
01394 }
01395 else
01396 {
01397 critical_section.lock();
01398
01399
01400
01401 switch (ready_agent->agent_state)
01402 {
01403 case Agent::Suspending:
01404
01405 pthread_mutex_unlock(&ready_agent->mutex);
01406
01407 graph->__suspend_agent_in_graph(ready_agent);
01408
01409 break;
01410
01411 case Agent::Deleting_From_Ready:
01412 case Agent::Deleting_From_Suspended:
01413
01414 pthread_mutex_unlock(&ready_agent->mutex);
01415
01416 graph->__remove_agent_in_graph(ready_agent);
01417
01418 delete ready_agent;
01419
01420 break;
01421
01422 default: EXIT("Invalid agent state %ld in run()",
01423 ready_agent->agent_state);
01424 }
01425 }
01426 }
01427 break;
01428 }
01429 default: EXIT("Invalid execution state %ld", graph->status);
01430 }
01431 }
01432 }
01433 };
01434
01435
01436 }
01437
01438 # endif