Sierra Toolkit  Version of the Day
RuntimeMessage.cpp
1 /*------------------------------------------------------------------------*/
2 /* Copyright 2010 Sandia Corporation. */
3 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */
4 /* license for use of this work by or on behalf of the U.S. Government. */
5 /* Export of this program may require a license from the */
6 /* United States Government. */
7 /*------------------------------------------------------------------------*/
8 
9 #include <list>
10 #include <string>
11 #include <sstream>
12 #include <utility>
13 #include <vector>
14 #include <boost/unordered_map.hpp>
15 
17 #include <stk_util/environment/ReportHandler.hpp>
18 #include <stk_util/util/Bootstrap.hpp>
19 #include <stk_util/util/Marshal.hpp>
20 
21 namespace stk_classic {
22 
23 MessageCode
25 
26 namespace {
27 
28 void bootstrap()
29 {
30  register_message_type(MSG_WARNING, 10000000, "warning");
31  register_message_type(MSG_DOOMED, 10000000, "error");
32  register_message_type(MSG_EXCEPTION, 1000000, "exception");
33  register_message_type(MSG_INFORMATION, 1000000, "informational");
34 }
35 
36 stk_classic::Bootstrap x(bootstrap);
37 
38 typedef std::pair<MessageId, std::string> MessageKey;
39 
40 typedef boost::unordered_map<MessageKey, Throttle> MessageIdMap;
41 
42 MessageIdMap s_messageIdMap;
43 
44 MessageIdMap s_deferredMessageIdMap;
45 
46 struct DeferredMessage
47 {
48  DeferredMessage()
49  {}
50 
51  DeferredMessage(
52  size_t type,
53  MessageId message_id,
54  size_t throttle_cutoff,
55  int throttle_group,
56  const std::string & header,
57  const std::string & aggregate)
58  : m_type(type),
59  m_messageId(message_id),
60  m_rank(0),
61  m_throttleCutoff(throttle_cutoff),
62  m_throttleGroup(throttle_group),
63  m_header(header),
64  m_aggregate(aggregate)
65  {}
66 
67  size_t m_type;
68  MessageId m_messageId;
69  int m_rank;
70  size_t m_throttleCutoff;
71  int m_throttleGroup;
72  std::string m_header;
73  std::string m_aggregate;
74 };
75 
76 typedef std::vector<DeferredMessage> DeferredMessageVector;
77 
78 struct DeferredMessageLess : public std::binary_function<DeferredMessage, DeferredMessage, bool>
79 {
80  bool operator()(const DeferredMessage &key_1, const DeferredMessage &key_2) const {
81  return (key_1.m_type < key_2.m_type)
82  || (!(key_2.m_type < key_1.m_type) && key_1.m_messageId < key_2.m_messageId)
83  || (!(key_2.m_type < key_1.m_type) && !(key_2.m_messageId < key_1.m_messageId) && key_1.m_header < key_2.m_header);
84  }
85 };
86 
87 DeferredMessageVector s_deferredMessageVector;
88 
89 struct MessageTypeInfo
90 {
91  MessageTypeInfo()
92  : m_count(0),
93  m_maxCount(10000000),
94  m_name("unknown")
95  {}
96 
97  unsigned m_count;
98  unsigned m_maxCount;
99  std::string m_name;
100 };
101 
102 typedef boost::unordered_map<unsigned, MessageTypeInfo> MessageTypeInfoMap;
103 
104 MessageTypeInfoMap s_messageTypeInfo;
105 
106 MessageTypeInfo &
107 get_message_type_info(
108  unsigned type)
109 {
110  MessageTypeInfoMap::iterator it = s_messageTypeInfo.find(type & MSG_TYPE_MASK);
111  if (it != s_messageTypeInfo.end())
112  return (*it).second;
113  else
114  return s_messageTypeInfo[type & MSG_TYPE_MASK];
115 }
116 
117 
118 enum CutoffStatus {
119  MSG_DISPLAY = 0,
120  MSG_CUTOFF = 1,
121  MSG_CUTOFF_EXCEEDED = 2
122 };
123 
124 
125 CutoffStatus
126 count_message(
127  MessageId message_id,
128  const char * message,
129  const Throttle & throttle)
130 {
131  std::pair<MessageIdMap::iterator, bool> res = s_messageIdMap.insert(MessageIdMap::value_type(MessageIdMap::key_type(message_id, message), throttle));
132  size_t count = ++(*res.first).second.m_count;
133 
134  if (count < (*res.first).second.m_cutoff)
135  return MSG_DISPLAY;
136  else if (count == (*res.first).second.m_cutoff)
137  return MSG_CUTOFF;
138  else
139  return MSG_CUTOFF_EXCEEDED;
140 }
141 
142 Marshal &operator<<(Marshal &mout, const DeferredMessage &s) {
143  mout << s.m_type << s.m_messageId << s.m_rank << s.m_throttleGroup << s.m_throttleCutoff << s.m_header << s.m_aggregate;
144  return mout;
145 }
146 
147 Marshal &operator>>(Marshal &min, DeferredMessage &s) {
148  min >> s.m_type >> s.m_messageId >> s.m_rank >> s.m_throttleGroup >> s.m_throttleCutoff >> s.m_header >> s.m_aggregate;
149  return min;
150 }
151 
152 } // namespace <empty>
153 
154 
155 void
157  unsigned message_type,
158  unsigned max_count,
159  const char * name)
160 {
161  MessageTypeInfo &message_info = get_message_type_info(message_type);
162 
163  message_info.m_maxCount = max_count;
164  message_info.m_name = name;
165 }
166 
167 
168 unsigned
170  unsigned message_type)
171 {
172  return get_message_type_info(message_type).m_count;
173 }
174 
175 
176 unsigned
177 increment_message_count(
178  unsigned message_type)
179 {
180  return ++get_message_type_info(message_type).m_count;
181 }
182 
183 
184 void
186  unsigned message_type)
187 {
188  get_message_type_info(message_type).m_count = 0;
189 }
190 
191 
192 const std::string &
194  unsigned message_type)
195 {
196  return get_message_type_info(message_type).m_name;
197 }
198 
199 
200 void
202  unsigned message_type,
203  unsigned max_count)
204 {
205  get_message_type_info(message_type).m_maxCount = max_count;
206 }
207 
208 
209 unsigned
211  unsigned message_type)
212 {
213  return get_message_type_info(message_type).m_maxCount;
214 }
215 
216 
217 void
219  const char * message,
220  unsigned message_type,
221  const MessageCode & message_code)
222 {
223  if (message_type & MSG_DEFERRED)
224  report(message, message_type);
225 
226  else {
227  unsigned count = increment_message_count(message_type);
228  unsigned max_count = get_max_message_count(message_type);
229 
230  if (count == max_count) {
231  report(message, message_type);
232 
233  std::ostringstream s;
234  s << "Maximum " << get_message_name(message_type) << " count has been exceeded and will no longer be displayed";
235  report(s.str().c_str(), MSG_WARNING | MSG_SYMMETRIC);
236  }
237 
238  else if (count < max_count) {
239  CutoffStatus cutoff = count_message(message_code.m_id, "", message_code.m_throttle);
240 
241  if (cutoff == MSG_CUTOFF) {
242  report(message, message_type);
243 
244  std::ostringstream s;
245  s << "Maximum count for this " << get_message_name(message_type) << " has been exceeded and will no longer be displayed";
246  report(s.str().c_str(), MSG_WARNING | MSG_SYMMETRIC);
247  }
248 
249  else if (cutoff == MSG_DISPLAY)
250  report(message, message_type);
251  }
252  }
253 }
254 
255 
256 void
258  int throttle_group)
259 {
260  for (MessageIdMap::iterator it = s_messageIdMap.begin(); it != s_messageIdMap.end(); ++it)
261  if ((*it).second.m_group == throttle_group)
262  (*it).second.m_count = 0;
263 }
264 
265 
266 void
268  int message_type,
269  MessageId message_id,
270  size_t throttle_cutoff,
271  int throttle_group,
272  const char * header,
273  const char * aggegrate)
274 {
275  std::ostringstream s;
276  s << header << " " << aggegrate;
277 
278  report(s.str().c_str(), message_type | MSG_DEFERRED);
279 
280  std::pair<MessageIdMap::iterator, bool> res = s_deferredMessageIdMap.insert(MessageIdMap::value_type(MessageIdMap::key_type(message_id, header), Throttle(throttle_cutoff, throttle_group)));
281  size_t count = ++(*res.first).second.m_count;
282 
283  if (count <= throttle_cutoff)
284  s_deferredMessageVector.push_back(DeferredMessage(message_type, message_id, throttle_cutoff, throttle_group, header, aggegrate));
285 }
286 
287 
289 
290 void
292  ParallelMachine comm)
293 {
294 #ifdef STK_HAS_MPI
295  const int p_root = 0 ;
296  const int p_size = parallel_machine_size(comm);
297  const int p_rank = parallel_machine_rank(comm);
298 
299  for (DeferredMessageVector::iterator it = s_deferredMessageVector.begin(); it != s_deferredMessageVector.end(); ++it)
300  (*it).m_rank = p_rank;
301 
302  Marshal mout;
303  mout << s_deferredMessageVector;
304 
305  DeferredMessageVector deferred_message_vector;
306 
307  // Gather the send counts on root processor
308  std::string send_string(mout.stream.str());
309  int send_count = send_string.size();
310  std::vector<int> recv_count(p_size, 0);
311  int * const recv_count_ptr = &recv_count[0] ;
312 
313  int result = MPI_Gather(&send_count, 1, MPI_INT,
314  recv_count_ptr, 1, MPI_INT,
315  p_root, comm);
316  if (MPI_SUCCESS != result) {
317  std::ostringstream message ;
318  message << "stk_classic::report_deferred_messages FAILED: MPI_Gather = " << result ;
319  throw std::runtime_error(message.str());
320  }
321 
322  // Receive counts are only non-zero on the root processor:
323  std::vector<int> recv_displ(p_size + 1, 0);
324 
325  for (int i = 0 ; i < p_size ; ++i) {
326  recv_displ[i + 1] = recv_displ[i] + recv_count[i] ;
327  }
328 
329  const int recv_size = recv_displ[p_size] ;
330 
331  std::vector<char> buffer(recv_size);
332 
333  {
334  const char * const send_ptr = send_string.data();
335  char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ;
336  int * const recv_displ_ptr = & recv_displ[0] ;
337 
338  result = MPI_Gatherv((void *) send_ptr, send_count, MPI_CHAR,
339  recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR,
340  p_root, comm);
341  if (MPI_SUCCESS != result) {
342  std::ostringstream message ;
343  message << "stk_classic::report_deferred_messages FAILED: MPI_Gatherv = " << result ;
344  throw std::runtime_error(message.str());
345  }
346 
347 
348  if (p_rank == p_root) {
349  for (int i = 0; i < p_size; ++i) {
350  Marshal min(std::string(recv_ptr + recv_displ[i], recv_ptr + recv_displ[i + 1]));
351  min >> deferred_message_vector;
352  }
353 
354  std::stable_sort(deferred_message_vector.begin(), deferred_message_vector.end(), DeferredMessageLess());
355 
356  DeferredMessageVector::const_iterator current_message_it = deferred_message_vector.begin();
357  while (current_message_it != deferred_message_vector.end()) {
358  const DeferredMessage &current_message = (*current_message_it);
359 
360  DeferredMessageVector::const_iterator end = current_message_it + 1;
361  while (end != deferred_message_vector.end()
362  && current_message.m_messageId == (*end).m_messageId
363  && current_message.m_header == (*end).m_header)
364  ++end;
365 
366  std::ostringstream s;
367 
368  s << current_message.m_header << current_message.m_aggregate;
369 
370  for (DeferredMessageVector::const_iterator it1 = current_message_it + 1; it1 != end; ++it1) {
371  bool print = true;
372  for (DeferredMessageVector::const_iterator it2 = current_message_it; it2 != it1; ++it2)
373  if ((*it1).m_aggregate == (*it2).m_aggregate) {
374  print = false;
375  break;
376  }
377  if (print) {
378  if (!(*it1).m_aggregate.find('\n'))
379  s << ", ";
380  s << (*it1).m_aggregate;
381  }
382  }
383 
384  report_message(s.str().c_str(), current_message.m_type | stk_classic::MSG_SYMMETRIC, MessageCode(current_message.m_messageId, current_message.m_throttleCutoff, current_message.m_throttleGroup));
385 
386  current_message_it = end;
387  }
388  }
389  }
390 
391  s_deferredMessageIdMap.clear();
392  s_deferredMessageVector.clear();
393 #endif
394 }
395 
396 
397 void
399  ParallelMachine comm,
400  std::ostringstream & os,
401  const char * separator)
402 {
403 #ifdef STK_HAS_MPI
404  std::string message = os.str();
405  os.str("");
406 
407  const int p_root = 0 ;
408  const int p_size = parallel_machine_size(comm);
409  const int p_rank = parallel_machine_rank(comm);
410 
411  int result ;
412 
413  // Gather the send counts on root processor
414 
415  int send_count = message.size();
416 
417  std::vector<int> recv_count(p_size, 0);
418 
419  int * const recv_count_ptr = & recv_count[0] ;
420 
421  result = MPI_Gather(& send_count, 1, MPI_INT,
422  recv_count_ptr, 1, MPI_INT,
423  p_root, comm);
424 
425  if (MPI_SUCCESS != result) {
426  std::ostringstream s;
427  s << "stk_classic::all_write FAILED: MPI_Gather = " << result ;
428  throw std::runtime_error(s.str());
429  }
430 
431  // Receive counts are only non-zero on the root processor:
432  std::vector<int> recv_displ(p_size + 1, 0);
433 
434  for (int i = 0 ; i < p_size ; ++i) {
435  recv_displ[i + 1] = recv_displ[i] + recv_count[i] ;
436  }
437 
438  const int recv_size = recv_displ[ p_size ] ;
439 
440  std::vector<char> buffer(recv_size);
441 
442  {
443  const char * const send_ptr = message.c_str();
444  char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ;
445  int * const recv_displ_ptr = & recv_displ[0] ;
446 
447  result = MPI_Gatherv((void*) send_ptr, send_count, MPI_CHAR,
448  recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR,
449  p_root, comm);
450  }
451 
452  if (MPI_SUCCESS != result) {
453  std::ostringstream s ;
454  s << "stk_classic::all_write FAILED: MPI_Gatherv = " << result ;
455  throw std::runtime_error(s.str());
456  }
457 
458  if (p_root == (int) p_rank) {
459  bool first = true;
460  for (int i = 0 ; i < p_size ; ++i) {
461  if (recv_count[i]) {
462  if (!first)
463  os << separator;
464  first = false;
465  char * const ptr = & buffer[ recv_displ[i] ];
466  os.write(ptr, recv_count[i]);
467  }
468  }
469  os.flush();
470  }
471  else
472  os << message;
473 #endif
474 }
475 
476 
477 std::ostream &
478 operator<<(
479  std::ostream & os,
480  const MessageType & message_type)
481 {
482 // if (message_type & MSG_SYMMETRIC)
483 // os << "parallel ";
484  os << get_message_type_info(message_type).m_name;
485 
486  return os;
487 }
488 
489 } // namespace stk_classic
std::ostream & print(std::ostream &os, const std::string &indent, const Bucket &bucket)
Print the parts and entities of this bucket.
Definition: Bucket.cpp:259
void add_deferred_message(int message_type, MessageId message_id, size_t throttle_cutoff, int throttle_group, const char *header, const char *aggegrate)
Function add_deferred_message adds a message to the deferred message queue.
Message is an exception.
MessageType
Enumeration MessageType declares the global message types.
Message is informational.
ptrdiff_t MessageId
Typedef MessageId defines a message identifier.
size_t size() const
Member function size returns the byte count of the string of packed bytes creates by put-to operation...
Definition: Marshal.cpp:180
void reset_message_count(unsigned message_type)
Member function reset_message_count ...
void report(const char *message, int type)
Function report calls the current exception reporter to report the message in x.
void register_message_type(unsigned message_type, unsigned max_count, const char *name)
Member function set_message_name ...
Class MessageCode declares a message identifier and throttle characteristics for a message...
Throttle m_throttle
Throttle characteristics.
Class Throttle describes the cutoff limits for a message throttle.
Struct Marshal is a data packer for sending and receiving parallel messages. The data put-to (<<) is ...
Definition: Marshal.hpp:49
unsigned parallel_machine_rank(ParallelMachine parallel_machine)
Member function parallel_machine_rank ...
Definition: Parallel.cpp:29
const std::string & get_message_name(unsigned message_type)
Member function get_message_name ...
std::ostream & operator<<(std::ostream &s, const Bucket &k)
Print the part names for which this bucket is a subset.
Definition: Bucket.cpp:239
unsigned get_max_message_count(unsigned message_type)
Member function get_max_message_count ...
void report_message(const char *message, unsigned message_type, const MessageCode &message_code)
Member function report_message ...
Message is a warning.
unsigned get_message_count(unsigned message_type)
Member function get_message_count ...
unsigned parallel_machine_size(ParallelMachine parallel_machine)
Member function parallel_machine_size ...
Definition: Parallel.cpp:18
Message is a fatal error.
Message is symmetrical.
Sierra Toolkit.
void set_max_message_count(unsigned message_type, unsigned max_count)
Member function set_max_message_count ...
void reset_throttle_group(int throttle_group)
Function reset_message_group sets the count to zero of all messages in the specified throttle group...
MPI_Comm ParallelMachine
Definition: Parallel.hpp:32
Class Bootstrap serves as a bootstrapping mechanism for products in the sierra toolkit and elsewhere...
Definition: Bootstrap.hpp:35
MessageId m_id
Message identifier.
Message is deferred.
void report_deferred_messages(ParallelMachine comm)
Function report_deferred_messages aggregates and reports the message on the root processor.
static MessageCode s_defaultMessageCode
Default message code.
void aggregate_messages(ParallelMachine comm, std::ostringstream &os, const char *separator)
Function aggregate_messages writes a message message to the output string by joining the messages fro...
eastl::iterator_traits< InputIterator >::difference_type count(InputIterator first, InputIterator last, const T &value)