rheolef  6.5
mpi_assembly_end.h
Go to the documentation of this file.
1 #ifndef _RHEO_MPI_ASSEMBLY_END_H
2 #define _RHEO_MPI_ASSEMBLY_END_H
3 
4 #include "rheolef/msg_util.h"
5 #include <boost/functional.hpp>
6 #include <boost/iterator/transform_iterator.hpp>
7 
8 namespace rheolef {
9 
22 template <
23  class Container,
24  class Message,
25  class Size>
26 Size
28  Message& receive,
29  Message& send,
30  Size receive_max_size,
31  Container x)
32 {
33  typedef Size size_type;
34  typedef typename Container::data_type data_type;
35 
36  typedef boost::transform_iterator<select2nd<size_type,mpi::request>,
37  typename std::list<std::pair<size_type,mpi::request> >::iterator>
38  request_iterator;
39 
40 #define _RHEOLEF_BUG_FOR_NON_MPI_DATA_TYPE
41 #ifdef _RHEOLEF_BUG_FOR_NON_MPI_DATA_TYPE
42  while (receive.waits.size() != 0) {
43  request_iterator iter_r_waits (receive.waits.begin(), select2nd<size_type,mpi::request>()),
44  last_r_waits (receive.waits.end(), select2nd<size_type,mpi::request>());
45  std::pair<mpi::status,request_iterator> pair_status = mpi::wait_any (iter_r_waits, last_r_waits);
46  boost::optional<int> i_msg_size_opt = pair_status.first.template count<data_type>();
47  check_macro (i_msg_size_opt, "receive wait failed");
48  int iproc = pair_status.first.source();
49  check_macro (iproc >= 0, "receive: source iproc = "<<iproc<<" < 0 !");
50  size_type i_msg_size = (size_type)i_msg_size_opt.get();
51  typename std::list<std::pair<size_type,mpi::request> >::iterator i_pair_ptr = pair_status.second.base();
52  size_type i_receive = (*i_pair_ptr).first;
53  size_type i_start = i_receive*receive_max_size;
54  for (size_type j = i_start; j < i_start + i_msg_size; j++) {
55  x (receive.data[j]);
56  }
57  receive.waits.erase (i_pair_ptr);
58  }
59 #else // _RHEOLEF_BUG_FOR_NON_MPI_DATA_TYPE
60  // wait_all works better when using an array of non mpi_data_type, as an array of set<size_t>
61  request_iterator iter_r_waits (receive.waits.begin(), select2nd<size_type,mpi::request>()),
62  last_r_waits (receive.waits.end(), select2nd<size_type,mpi::request>());
63  std::vector<mpi::status> recv_status (receive.waits.size());
64  mpi::wait_all (iter_r_waits, last_r_waits, recv_status.begin());
65  for (size_type i_recv = 0, n_recv = recv_status.size(); i_recv < n_recv; i_recv++) {
66  boost::optional<int> i_msg_size_opt = recv_status[i_recv].template count<data_type>();
67  check_macro (i_msg_size_opt, "receive wait failed");
68  int iproc = recv_status[i_recv].source();
69  check_macro (iproc >= 0, "receive: source iproc = "<<iproc<<" < 0 !");
70  size_type i_msg_size = (size_type)i_msg_size_opt.get();
71  size_type i_start = i_recv*receive_max_size;
72  for (size_type j = i_start; j < i_start + i_msg_size; j++) {
73  x (receive.data[j]);
74  }
75  }
76 #endif // _RHEOLEF_BUG_FOR_NON_MPI_DATA_TYPE
77  request_iterator iter_s_waits (send.waits.begin(), select2nd<size_type,mpi::request>()),
78  last_s_waits (send.waits.end(), select2nd<size_type,mpi::request>());
79  Size send_nproc = send.waits.size();
80  std::vector<mpi::status> send_status(send_nproc);
81  mpi::wait_all (iter_s_waits, last_s_waits, send_status.begin());
82  send.waits.clear();
83  send.data.clear();
84  receive.waits.clear();
85  receive.data.clear();
86  return x.n_new_entry();
87 }
88 @endcode
89 } // namespace rheolef
90 #endif //_RHEO_MPI_ASSEMBLY_END_H
91