rheolef  6.6
diststream.h
Go to the documentation of this file.
1 # ifndef _RHEOLEF_DISTSTREAM_H
2 # define _RHEOLEF_DISTSTREAM_H
3 // distributed i/o
4 #include "rheolef/distributed.h"
5 #include "rheolef/dis_macros.h"
6 #include "rheolef/catchmark.h"
7 namespace rheolef {
8 
68 class odiststream {
69 public:
70  typedef std::size_t size_type;
71 
72 // allocators/deallocators:
73 
74  odiststream();
75  odiststream (std::string filename, std::string suffix = "",
77  odiststream (std::string filename,
78  io::mode_type mode, const communicator& comm = communicator());
79  odiststream (std::string filename, std::string suffix, const communicator& comm);
80  odiststream (std::string filename, const communicator& comm);
81  odiststream(std::ostream& os, const communicator& comm = communicator());
82  ~odiststream();
83 
84 
85  void open (std::string filename, std::string suffix = "",
87  void open (std::string filename,
88  io::mode_type mode, const communicator& comm = communicator());
89  void open (std::string filename, std::string suffix,
90  const communicator& comm);
91  void open (std::string filename, const communicator& comm);
92  void flush();
93  void close();
94 
95 
96  const communicator& comm() const { return _comm; }
97  bool good() const;
98  operator bool() const { return good(); }
99  static size_type io_proc();
100 
101 
102  std::ostream& os();
103  bool nop();
104 
105 protected:
106  std::ostream* _ptr_os;
109 };
110 @endcode
111 
112 inline
114  : _ptr_os(0), _use_alloc(false), _comm()
115 {
116 }
117 inline
118 odiststream::odiststream (std::string filename, std::string suffix, io::mode_type mode, const communicator& comm)
119  : _ptr_os(0), _use_alloc(false), _comm()
120 {
121  open (filename, suffix, mode, comm);
122 }
123 inline
124 odiststream::odiststream (std::string filename, io::mode_type mode, const communicator& comm)
125  : _ptr_os(0), _use_alloc(false), _comm()
126 {
127  open (filename, mode, comm);
128 }
129 inline
130 odiststream::odiststream (std::string filename, std::string suffix, const communicator& comm)
131  : _ptr_os(0), _use_alloc(false), _comm()
132 {
133  open (filename, suffix, comm);
134 }
135 inline
136 odiststream::odiststream (std::string filename, const communicator& comm)
137  : _ptr_os(0), _use_alloc(false), _comm()
138 {
139  open (filename, comm);
140 }
141 inline
142 odiststream::odiststream(std::ostream& os, const communicator& comm)
143  : _ptr_os(&os), _use_alloc(false), _comm(comm)
144 {
145 }
146 inline
147 void
148 odiststream::open (std::string filename, io::mode_type mode, const communicator& comm)
149 {
150  open (filename, std::string(""), mode, comm);
151 }
152 inline
153 void
154 odiststream::open (std::string filename, std::string suffix, const communicator& comm)
155 {
156  open (filename, suffix, io::out, comm);
157 }
158 inline
159 void
160 odiststream::open (std::string filename, const communicator& comm)
161 {
162  open (filename, std::string(""), io::out, comm);
163 }
164 inline
165 std::ostream&
167  check_macro (_ptr_os != 0, "try to use an uninitialized odiststream");
168  return *_ptr_os;
169 }
170 #ifndef _RHEOLEF_HAVE_MPI
171 inline bool odiststream::nop() { return false; }
172 #else
173 inline bool odiststream::nop() { return size_type(_comm.rank()) != io_proc(); }
174 #endif //_RHEOLEF_HAVE_MPI
175 
176 // standard i/o:
177 # define define_sequential_odiststream_raw_macro(arg) \
178  inline \
179  odiststream& \
180  operator << (odiststream& s, arg) { \
181  if (s.nop()) return s; s.os() << x; return s; \
182  }
183 # define define_sequential_odiststream_macro(T) \
184  define_sequential_odiststream_raw_macro(const T& x)
185 
186 # define define_distributed_odiststream_macro(T) \
187  inline \
188  odiststream& \
189  operator << (odiststream& s, const T& x) { \
190  s.os() << x; return s; \
191  }
192 
193 template <class T>
194 inline
195 odiststream&
197  if (s.nop()) return s; s.os() << x; return s;
198 }
203 define_sequential_odiststream_macro(long unsigned int)
209 #ifdef _RHEOLEF_HAVE_FLOAT128
211 #endif // _RHEOLEF_HAVE_FLOAT128
212 #ifdef _RHEOLEF_HAVE_QD
215 #endif // _RHEOLEF_HAVE_QD
216 #ifdef _RHEOLEF_HAVE_DOUBLEDOUBLE
218 #endif // _RHEOLEF_HAVE_DOUBLEDOUBLE
220 define_sequential_odiststream_raw_macro(std::ostream& (*x)(std::ostream&))
221 
222 class idiststream {
223 public:
224  typedef std::size_t size_type;
225 
226 // allocators/deallocators:
227 
229  idiststream (std::istream& is, const communicator& comm = communicator());
230  idiststream (std::string filename, std::string suffix = "",
231  const communicator& comm = communicator());
232  ~idiststream();
233 
234 
235  void open (std::string filename, std::string suffix = "",
236  const communicator& comm = communicator());
237  void close();
238 
239 
240  const communicator& comm() const { return _comm; }
241  bool good() const;
242  operator bool() const { return good(); }
243  static size_type io_proc();
244 
245 
246  std::istream& is();
247  bool nop();
248  bool do_load();
249 
250 protected:
251  std::istream* _ptr_is;
254 };
255 @endcode
256 inline
258  : _ptr_is(0), _use_alloc(false), _comm()
259 {
260 }
261 inline
262 idiststream::idiststream (std::istream& is, const communicator& comm)
263  : _ptr_is(&is), _use_alloc(false), _comm(comm)
264 {
265 }
266 inline
267 idiststream::idiststream (std::string filename, std::string suffix, const communicator& comm)
268  : _ptr_is(0), _use_alloc(false), _comm()
269 {
270  open (filename, suffix, comm);
271 }
272 inline
273 std::istream&
275 {
276  check_macro (_ptr_is != 0, "try to use an uninitialized idiststream");
277  return *_ptr_is;
278 }
279 #ifndef _RHEOLEF_HAVE_MPI
280 inline bool idiststream::nop() { return false; }
281 inline bool idiststream::do_load() { return true; }
282 #else
283 inline bool idiststream::nop() { return size_type(_comm.rank()) != io_proc(); }
284 inline bool idiststream::do_load() { return size_type(_comm.rank()) == io_proc(); }
285 #endif //_RHEOLEF_HAVE_MPI
286 
287 // standard i/o:
288 #ifdef _RHEOLEF_HAVE_MPI
289 # define define_sequential_idiststream_macro(T) \
290 inline \
291 idiststream& \
292 operator >> (idiststream& s, T& x) \
293 { \
294  if (s.do_load()) { (s.is()) >> x; } \
295  mpi::broadcast (mpi::communicator(), x, s.io_proc()); \
296  return s; \
297 }
298 #else // _RHEOLEF_HAVE_MPI
299 # define define_sequential_idiststream_macro(T) \
300 inline \
301 idiststream& \
302 operator >> (idiststream& s, T& x) \
303 { \
304  (s.is()) >> x; \
305  return s; \
306 }
307 #endif // _RHEOLEF_HAVE_MPI
308 # define define_distributed_idiststream_macro(T) \
309 inline \
310 idiststream& \
311 operator >> (idiststream& s, T& x) \
312 { \
313  s.is() >> x; \
314  return s; \
315 }
316 
321 define_sequential_idiststream_macro(long unsigned int)
326 #ifdef _RHEOLEF_HAVE_FLOAT128
328 #endif // _RHEOLEF_HAVE_FLOAT128
329 #ifdef _RHEOLEF_HAVE_QD
332 #endif // _RHEOLEF_HAVE_QD
333 #ifdef _RHEOLEF_HAVE_DOUBLEDOUBLE
335 #endif // _RHEOLEF_HAVE_DOUBLEDOUBLE
336 
337 extern idiststream din;
338 extern odiststream dout;
341 
342 bool dis_scatch (idiststream& ips, const communicator& comm, std::string ch);
343 
344 inline
345 bool dis_scatch (idiststream& ips, std::string ch)
346 {
347  return dis_scatch (ips, ips.comm(), ch);
348 }
349 inline
350 idiststream&
351 operator>> (idiststream& ids, const catchmark& m)
352 {
353  if (ids.nop()) return ids;
354  ids.is() >> setmark(m.mark());
355  std::string label = "#" + m.mark();
356  if (!scatch(ids.is(),label)) {
357  bool verbose = iorheo::getverbose(ids.is());
358  if (verbose) warning_macro ("catchmark: label `"<< label <<"' not found on input");
359  }
360  return ids;
361 }
362 inline
363 odiststream&
365 {
366  if (ods.nop()) return ods;
367  ods.os() << setmark(m.mark())
368  << "#" << m.mark() << std::endl;
369  return ods;
370 }
371 int dis_system (const std::string& command, const communicator& comm = communicator());
372 bool dis_file_exists (const std::string& filename, const communicator& comm = communicator());
373 
374 
375 } // namespace rheolef
376 # endif // _RHEOLEF_DISTSTREAM_H
define_sequential_odiststream_macro(char) define_sequential_odiststream_macro(int) define_sequential_odiststream_macro(unsigned int) define_sequential_odiststream_macro(long int) define_sequential_odiststream_macro(long unsigned int) define_sequential_odiststream_macro(float) define_sequential_odiststream_macro(double) define_sequential_odiststream_macro(long double) define_sequential_odiststream_macro(char *const ) define_sequential_odiststream_macro(st idiststream)()
Definition: diststream.h:228
bool nop()
bool dis_scatch(idiststream &ips, std::string ch)
Definition: diststream.h:345
void open(std::string filename, std::string suffix="", const communicator &comm=communicator())
idiststream, odiststream - distributed interface for large data streams
Definition: diststream.h:68
idiststream(std::string filename, std::string suffix="", const communicator &comm=communicator())
static size_type io_proc()
Definition: diststream.cc:43
#define define_sequential_odiststream_macro(T)
Definition: diststream.h:183
#define define_sequential_idiststream_macro(T)
Definition: diststream.h:299
const communicator & comm() const
Definition: diststream.h:96
STL namespace.
const communicator & comm() const
Definition: diststream.h:240
irheostream, orheostream - large data streams
Definition: compiler.h:7
std::ostream * _ptr_os
Definition: diststream.h:106
bool scatch(istream &in, const string &ch, bool full_match)
Definition: rheostream.cc:220
#define warning_macro(message)
Definition: compiler.h:102
idiststream & operator>>(idiststream &ids, const catchmark &m)
Definition: diststream.h:351
#define check_macro(ok_condition, message)
Definition: compiler.h:104
std::istream & is()
Definition: diststream.h:274
static size_type io_proc()
idiststream din(cin)
bool good() const
Definition: diststream.cc:154
#define define_sequential_odiststream_raw_macro(arg)
Definition: diststream.h:177
void open(std::string filename, std::string suffix="", io::mode_type mode=io::out, const communicator &comm=communicator())
Definition: diststream.cc:103
odiststream dlog(clog)
Definition: diststream.h:339
size_type rank() const
Definition: distributed.h:53
bool _use_alloc
Definition: diststream.h:252
catchmark - iostream manipulator
Definition: catchmark.h:30
communicator _comm
Definition: diststream.h:108
int dis_system(const std::string &command, const communicator &comm)
Definition: diststream.cc:170
bool dis_file_exists(const std::string &filename, const communicator &comm)
Definition: diststream.cc:189
bool good() const
odiststream derr(cerr)
Definition: diststream.h:340
std::ostream & operator<<(std::ostream &os, const catchmark &m)
Definition: catchmark.h:59
std::istream * _ptr_is
Definition: diststream.h:251
reference_element_H::size_type size_type
~idiststream()
Definition: diststream.cc:77
const std::string & mark() const
Definition: catchmark.h:33
bool do_load()
Definition: diststream.h:281
std::size_t size_type
Definition: diststream.h:70
std::ostream & os()
Definition: diststream.h:166
odiststream dout(cout)
Definition: diststream.h:317
communicator _comm
Definition: diststream.h:253
ostream & operator<<(ostream &os, const tiny_element &K)
Definition: tiny_element.cc:7
void close()