rheolef  6.3
domain_indirect_mpi.cc
Go to the documentation of this file.
1 
2 #include "rheolef/config.h"
3 
4 #ifdef _RHEOLEF_HAVE_MPI
5 #include "rheolef/domain_indirect.h"
6 #include "rheolef/geo.h"
7 
8 namespace rheolef {
9 
10 template<class T>
12  const geo_basic<T,distributed>& omega,
13  const std::string& name,
14  size_type map_dim,
15  const communicator& comm,
16  const std::vector<size_type>& ie_list)
17  : domain_indirect_base_rep<distributed>(name, map_dim, comm, ie_list),
18  _ioige2ini_dis_ioige(),
19  _ini_ioige2dis_ioige()
20 {
21  // compute i/o _ioige2ini_dis_ioige & _ioige2ini_dis_ioige numbering tables
23  distributor ios_ownership = omega.ios_sizes().ownership_by_dimension [map_dim];
24  array<size_type,distributed> ios_mark (ios_ownership, unset);
25  distributor dom_ownership = base::ownership();
26  size_type first_dom_dis_ie = dom_ownership.first_index();
27  size_type dom_ie = 0;
28  for (const_iterator_ioige iter = base::ioige_begin(), last = base::ioige_end();
29  iter != last; iter++, dom_ie++) {
30  size_type ie = (*iter).index();
31  size_type ios_dis_ie = omega.ige2ios_dis_ige (map_dim, ie);
32  size_type dom_dis_ie = first_dom_dis_ie + dom_ie;
33  ios_mark.dis_entry (ios_dis_ie) = dom_dis_ie;
34  }
35  ios_mark.dis_entry_assembly();
36  size_type dom_ini_size = 0;
37  for (size_type ios_ie = 0, ios_ne = ios_mark.size(); ios_ie < ios_ne; ios_ie++) {
38  if (ios_mark [ios_ie] != unset) dom_ini_size++;
39  }
40  distributor dom_ini_ownership (distributor::decide, comm, dom_ini_size);
41  _ini_ioige2dis_ioige.resize (dom_ini_ownership, unset);
42  size_type dom_ini_ie = 0;
43  for (size_type ios_ie = 0, ios_ne = ios_mark.size(); ios_ie < ios_ne; ios_ie++) {
44  if (ios_mark [ios_ie] == unset) continue;
45  _ini_ioige2dis_ioige [dom_ini_ie] = ios_mark [ios_ie];
46  dom_ini_ie++;
47  }
48  _ioige2ini_dis_ioige.resize (dom_ownership, unset);
49  _ini_ioige2dis_ioige.reverse_permutation (_ioige2ini_dis_ioige);
50 }
159 template<class U>
160 idiststream&
162  idiststream& ips,
163  const geo_rep<U,distributed>& omega)
164 {
166  communicator comm = omega.comm();
167  if ( ! dis_scatch (ips, comm, "\ndomain")) {
168  ips.is().setstate (std::ios::badbit);
169  return ips;
170  }
171  size_type version, dis_noige;
172  ips >> base::_name
173  >> version
174  >> base::_map_dim
175  >> dis_noige;
176  check_macro (version == 2, "unsupported version="<<version<<" domain format");
177 
178  distributor ini_ioige_ownership (dis_noige, comm);
179  array<geo_element_indirect,distributed> ini_oige (ini_ioige_ownership);
180  ini_oige.get_values (ips);
181  distributor ios_ige_ownership = omega.geo_element_ios_ownership (base::_map_dim);
182  array<size_type> ios_owner (ini_ioige_ownership, unset);
183  for (size_type ini_ioige = 0, ini_noige = ini_oige.size();
184  ini_ioige < ini_noige; ini_ioige++) {
185  size_type ios_ige = ini_oige [ini_ioige].index();
186  ios_owner [ini_ioige] = ios_ige_ownership.find_owner (ios_ige);
187  }
188  array<geo_element_indirect> ios_oige;
189  array<size_type> ini_ioige2ios_dis_ioige;
190  array<size_type> ios_ioige2ini_dis_ioige;
191  ini_oige.repartition (
192  ios_owner,
193  ios_oige,
194  ios_ioige2ini_dis_ioige,
195  ini_ioige2ios_dis_ioige);
196 
197  distributor ios_ioige_ownership = ios_oige.ownership();
198  array<geo_element_indirect> tmp_oige (ios_ioige_ownership);
199  for (size_type ios_ioige = 0, ios_noige = ios_ioige_ownership.size();
200  ios_ioige < ios_noige; ios_ioige++) {
201  orientation_type orient = ios_oige [ios_ioige].orientation();
202  size_type ios_dis_ige = ios_oige [ios_ioige].index();
203  size_type ios_ige = ios_dis_ige - ios_ige_ownership.first_index();
204  size_type dis_ige = omega.ios_ige2dis_ige (base::_map_dim, ios_ige);
205  tmp_oige [ios_ioige].set (orient, dis_ige);
206  }
207  distributor ige_ownership = omega.geo_element_ownership (base::_map_dim);
208  array<size_type> partition (ios_ioige_ownership, unset);
209  for (size_type ios_ioige = 0, ios_noige = ios_ioige_ownership.size();
210  ios_ioige < ios_noige; ios_ioige++) {
211  size_type ige = tmp_oige [ios_ioige].index();
212  partition [ios_ioige] = ige_ownership.find_owner (ige);
213  }
214  array<size_type> ios_ioige2dis_ioige;
215  array<size_type> ioige2ios_dis_ioige;
216  tmp_oige.repartition (
217  partition,
218  *this,
219  ioige2ios_dis_ioige,
220  ios_ioige2dis_ioige);
221 
222  distributor ioige_ownership = array<geo_element_indirect>::ownership();
223  for (size_type ioige = 0, noige = ioige_ownership.size(); ioige < noige; ioige++) {
224  size_type dis_ige = operator[] (ioige).index();
225  size_type ige = dis_ige - ige_ownership.first_index();
226  operator[] (ioige).set_index (ige);
227  }
228  _ini_ioige2dis_ioige.resize (ini_ioige_ownership, unset);
229  _ioige2ini_dis_ioige.resize ( ioige_ownership, unset);
230  for (size_type ios_ioige = 0, ios_noige = ios_ioige_ownership.size();
231  ios_ioige < ios_noige; ios_ioige++) {
232  size_type ini_dis_ioige = ios_ioige2ini_dis_ioige [ios_ioige];
233  size_type dis_ioige = ios_ioige2dis_ioige [ios_ioige];
234  _ini_ioige2dis_ioige.dis_entry (ini_dis_ioige) = dis_ioige;
235  _ioige2ini_dis_ioige.dis_entry (dis_ioige) = ini_dis_ioige;
236  }
237  _ioige2ini_dis_ioige.dis_entry_assembly();
238  _ini_ioige2dis_ioige.dis_entry_assembly();
239 
240  return ips;
241 }
242 template<class U>
245  odiststream& ops,
246  const geo_rep<U,distributed>& omega) const
247 {
248  using namespace std;
249  ops << "domain" << endl
250  << base::_name << endl
251  << "2 " << base::_map_dim << " " << dis_size() << endl;
252  distributor ioige_ownership = array<geo_element_indirect>::ownership();
253  distributor ini_ioige_ownership = _ini_ioige2dis_ioige.ownership();
254  distributor ige_ownership = omega.geo_element_ownership (base::_map_dim);
255  array<geo_element_indirect> ini_oige (ini_ioige_ownership);
256  for (size_type ioige = 0, noige = ioige_ownership.size(); ioige < noige; ioige++) {
257  size_type ini_dis_ioige = _ioige2ini_dis_ioige [ioige];
258  orientation_type orient = operator[] (ioige).orientation();
259  size_type ige = operator[] (ioige).index();
260  size_type ios_dis_ige = omega.ige2ios_dis_ige (base::_map_dim, ige);
261  ini_oige.dis_entry (ini_dis_ioige) = geo_element_indirect (orient, ios_dis_ige);
262  }
263  ini_oige.dis_entry_assembly();
264  ini_oige.put_values (ops);
265  return ops;
266 }
267 template
268 idiststream&
270  idiststream& ips,
271  const geo_rep<Float,distributed>& omega);
272 
273 template
276  odiststream& ops,
277  const geo_rep<Float,distributed>& omega) const;
278 
279 #define _RHEOLEF_instanciation(T,M) \
280 template \
281 domain_indirect_rep<M>::domain_indirect_rep ( \
282  const geo_basic<T,M>& omega, \
283  const std::string& name, \
284  size_type map_dim, \
285  const communicator& comm, \
286  const std::vector<size_type>& ie_list);
287 
289 
290 } // namespace rheolef
291 #endif // _RHEOLEF_HAVE_MPI
292