rheolef  6.5
mpi_scatter_init.h
Go to the documentation of this file.
1 #ifndef _RHEO_MPI_SCATTER_INIT_H
2 #define _RHEO_MPI_SCATTER_INIT_H
3 
4 #include "rheolef/compiler.h"
5 #include "rheolef/distributed.h"
6 #include "rheolef/scatter_message.h"
7 
8 #include "rheolef/msg_sort_with_permutation.h"
9 #include "rheolef/msg_to_context.h"
10 #include "rheolef/msg_from_context_pattern.h"
11 #include "rheolef/msg_from_context_indices.h"
12 #include "rheolef/msg_local_context.h"
13 #include "rheolef/msg_local_optimize.h"
14 
15 #include "rheolef/msg_util.h"
16 #include <boost/functional.hpp>
17 #include <boost/iterator/transform_iterator.hpp>
18 
40 namespace rheolef {
41 
42 template <class Message, class Size, class SizeRandomIterator1,
43  class SizeRandomIterator2, class SizeRandomIterator3, class Tag>
44 void
46  Size nidx,
47  SizeRandomIterator1 idx,
48  Size nidy,
49  SizeRandomIterator2 idy,
50  Size idy_maxval,
51  SizeRandomIterator3 ownership,
52  Tag tag,
54  Message& from,
55  Message& to)
56 {
57  typedef Size size_type;
58  size_type my_proc = comm.rank();
59  size_type nproc = comm.size();
60 
61  std::vector<size_type> msg_size(nproc, 0);
62  std::vector<size_type> msg_mark(nproc, 0);
63  std::vector<size_type> owner (nidx);
64  size_type send_nproc = 0;
65  {
66  size_type iproc = 0;
67  for (size_type i = 0; i < nidx; i++) {
68  for (; iproc < nproc; iproc++) {
69  if (idx[i] >= ownership[iproc] && idx[i] < ownership[iproc+1]) {
70  owner[i] = iproc;
71  msg_size [iproc]++;
72  if (!msg_mark[iproc]) {
73  msg_mark[iproc] = 1;
74  send_nproc++;
75  }
76  break;
77  }
78  }
79  check_macro (iproc != nproc, "bad stash data: idx["<<i<<"]="<<idx[i]<<" out of range [0:"<<ownership[nproc]<<"[");
80  }
81  } // end block
82  size_type n_local = msg_size[my_proc];
83  if (n_local != 0) {
84  msg_size [my_proc] = 0;
85  msg_mark [my_proc] = 0;
86  send_nproc--;
87  }
88  std::vector<size_type> work(nproc);
89  mpi::all_reduce (
90  comm,
91  msg_mark.begin().operator->(),
92  nproc,
93  work.begin().operator->(),
94  std::plus<size_type>());
95  size_type receive_nproc = work [my_proc];
96  mpi::all_reduce (
97  comm,
98  msg_size.begin().operator->(),
99  nproc,
100  work.begin().operator->(),
101  mpi::maximum<size_type>());
102  size_type receive_max_size = work [my_proc];
103  std::list<std::pair<size_type,mpi::request> > receive_waits;
104  std::vector<size_type> receive_data (receive_nproc*receive_max_size);
105  for (size_type i_receive = 0; i_receive < receive_nproc; i_receive++) {
106  mpi::request i_req = comm.irecv (
107  mpi::any_source,
108  tag,
109  receive_data.begin().operator->() + i_receive*receive_max_size,
110  receive_max_size);
111  receive_waits.push_back (std::make_pair(i_receive, i_req));
112  }
113  std::vector<size_type> send_data (nidx);
114  std::copy (idx, idx+nidx, send_data.begin());
115  std::list<std::pair<size_type,mpi::request> > send_waits;
116  {
117  size_type i_send = 0;
118  size_type i_start = 0;
119  for (size_type iproc = 0; iproc < nproc; iproc++) {
120  size_type i_msg_size = msg_size[iproc];
121  if (i_msg_size == 0) continue;
122  mpi::request i_req = comm.isend (
123  iproc,
124  tag,
125  send_data.begin().operator->() + i_start,
126  i_msg_size);
127  send_waits.push_back(std::make_pair(i_send,i_req));
128  i_send++;
129  i_start += i_msg_size;
130  }
131  } // end block
132  typedef boost::transform_iterator<select2nd<size_t,mpi::request>, std::list<std::pair<size_t,mpi::request> >::iterator>
133  request_iterator;
134  std::vector<size_type> receive_size (receive_nproc);
135  std::vector<size_type> receive_proc (receive_nproc);
136  size_type receive_total_size = 0;
137  while (receive_waits.size() != 0) {
138  typedef size_type data_type; // exchanged data is of "size_type"
139  request_iterator iter_r_waits (receive_waits.begin(), select2nd<size_t,mpi::request>()),
140  last_r_waits (receive_waits.end(), select2nd<size_t,mpi::request>());
141  std::pair<mpi::status,request_iterator> pair_status = mpi::wait_any (iter_r_waits, last_r_waits);
142  boost::optional<int> i_msg_size_opt = pair_status.first.count<data_type>();
143  check_macro (i_msg_size_opt, "receive wait failed");
144  int iproc = pair_status.first.source();
145  check_macro (iproc >= 0, "receive: source iproc = "<<iproc<<" < 0 !");
146  size_type i_msg_size = (size_t)i_msg_size_opt.get();
147  std::list<std::pair<size_t,mpi::request> >::iterator i_pair_ptr = pair_status.second.base();
148  size_type i_receive = (*i_pair_ptr).first;
149  receive_proc [i_receive] = iproc;
150  receive_size [i_receive] = i_msg_size;
151  receive_total_size += i_msg_size;
152  receive_waits.erase (i_pair_ptr);
153  }
154  to.resize (receive_total_size, receive_nproc);
155 
156  std::vector<size_type> perm(receive_nproc);
157  copy(index_iterator<size_type>(), index_iterator<size_type>(receive_nproc), perm.begin());
159  receive_nproc,
160  receive_proc.begin().operator->(),
161  perm.begin().operator->());
162  size_type istart = ownership[my_proc]; // = ownership.first_index()
164  perm.begin(),
165  perm.end(),
166  receive_proc.begin(),
167  receive_size.begin(),
168  receive_data.begin(),
169  receive_max_size,
170  istart,
171  to.procs().begin(),
172  to.starts().begin(),
173  to.indices().begin());
174  from.resize(nidy, send_nproc);
175  std::vector<size_type> proc2from_proc(nproc);
177  msg_size.begin(),
178  msg_size.end(),
179  from.procs().begin(),
180  from.starts().begin(),
181  proc2from_proc.begin());
182  std::vector<size_type> start(send_nproc+1);
183  copy (from.starts().begin(), from.starts().end(), start.begin());
185  owner.begin(),
186  owner.end(),
187  idy,
188  proc2from_proc.begin(),
189  my_proc,
190  idy_maxval,
191  start.begin(),
192  from.indices().begin());
193  request_iterator iter_s_waits (send_waits.begin(), select2nd<size_type,mpi::request>()),
194  last_s_waits (send_waits.end(), select2nd<size_type,mpi::request>());
195  mpi::wait_all (iter_s_waits, last_s_waits);
196  from.local_slots.resize(n_local);
197  to.local_slots.resize(n_local);
198  size_type ilast = ownership[my_proc+1]; // = ownership.last_index()
200  idx,
201  idx+nidx,
202  idy,
203  idy_maxval,
204  istart,
205  ilast,
206  to.local_slots.begin(),
207  to.local_slots.end(),
208  from.local_slots.begin());
209  // 17) Optimize local exchanges during gatter/scatter
210  bool has_opt = msg_local_optimize (
211  to.local_slots.begin(),
212  to.local_slots.end(),
213  from.local_slots.begin());
214 
215  if (has_opt && n_local != 0) {
216  to.local_is_copy = true;
217  to.local_copy_start = to.local_slots[0];
218  to.local_copy_length = n_local;
219  from.local_is_copy = true;
220  from.local_copy_start = from.local_slots[0];
221  from.local_copy_length = n_local;
222  }
223 }
224 @endcode
225 } // namespace rheolef
226 #endif // _RHEO_MPI_SCATTER_INIT_H
227