rheolef  7.0
diststream.h
Go to the documentation of this file.
1 # ifndef _RHEOLEF_DISTSTREAM_H
2 # define _RHEOLEF_DISTSTREAM_H
3 // distributed i/o
4 #include "rheolef/distributed.h"
5 #include "rheolef/dis_macros.h"
6 #include "rheolef/catchmark.h"
7 namespace rheolef {
8 
68 class odiststream {
69 public:
70  typedef std::size_t size_type;
71 
72 // allocators/deallocators:
73 
74  odiststream();
75  odiststream (std::string filename, std::string suffix = "",
77  odiststream (std::string filename,
78  io::mode_type mode, const communicator& comm = communicator());
79  odiststream (std::string filename, std::string suffix, const communicator& comm);
80  odiststream (std::string filename, const communicator& comm);
81  odiststream(std::ostream& os, const communicator& comm = communicator());
82  ~odiststream();
83 
84 
85  void open (std::string filename, std::string suffix = "",
87  void open (std::string filename,
88  io::mode_type mode, const communicator& comm = communicator());
89  void open (std::string filename, std::string suffix,
90  const communicator& comm);
91  void open (std::string filename, const communicator& comm);
92  void flush();
93  void close();
94 
95 
96  const communicator& comm() const { return _comm; }
97  bool good() const;
98  operator bool() const { return good(); }
99  static size_type io_proc();
100 
101 
102  std::ostream& os();
103  bool nop();
104 
105 protected:
106  std::ostream* _ptr_os;
109 };
110 @endcode
111 
112 inline
114  : _ptr_os(0), _use_alloc(false), _comm()
115 {
116 }
117 inline
118 odiststream::odiststream (std::string filename, std::string suffix, io::mode_type mode, const communicator& comm)
119  : _ptr_os(0), _use_alloc(false), _comm()
120 {
121  open (filename, suffix, mode, comm);
122 }
123 inline
124 odiststream::odiststream (std::string filename, io::mode_type mode, const communicator& comm)
125  : _ptr_os(0), _use_alloc(false), _comm()
126 {
127  open (filename, mode, comm);
128 }
129 inline
130 odiststream::odiststream (std::string filename, std::string suffix, const communicator& comm)
131  : _ptr_os(0), _use_alloc(false), _comm()
132 {
133  open (filename, suffix, comm);
134 }
135 inline
136 odiststream::odiststream (std::string filename, const communicator& comm)
137  : _ptr_os(0), _use_alloc(false), _comm()
138 {
139  open (filename, comm);
140 }
141 inline
142 odiststream::odiststream(std::ostream& os, const communicator& comm)
143  : _ptr_os(&os), _use_alloc(false), _comm(comm)
144 {
145 }
146 inline
147 void
148 odiststream::open (std::string filename, io::mode_type mode, const communicator& comm)
149 {
150  open (filename, std::string(""), mode, comm);
151 }
152 inline
153 void
154 odiststream::open (std::string filename, std::string suffix, const communicator& comm)
155 {
156  open (filename, suffix, io::out, comm);
157 }
158 inline
159 void
160 odiststream::open (std::string filename, const communicator& comm)
161 {
162  open (filename, std::string(""), io::out, comm);
163 }
164 inline
165 std::ostream&
167  check_macro (_ptr_os != 0, "try to use an uninitialized odiststream");
168  return *_ptr_os;
169 }
170 #ifndef _RHEOLEF_HAVE_MPI
171 inline bool odiststream::nop() { return false; }
172 #else
173 inline bool odiststream::nop() { return size_type(_comm.rank()) != io_proc(); }
174 #endif //_RHEOLEF_HAVE_MPI
175 
176 // standard i/o:
177 # define define_sequential_odiststream_raw_macro(arg) \
178  inline \
179  odiststream& \
180  operator << (odiststream& s, arg) { \
181  if (s.nop()) return s; s.os() << x; return s; \
182  }
183 # define define_sequential_odiststream_macro(T) \
184  define_sequential_odiststream_raw_macro(const T& x)
185 
186 # define define_distributed_odiststream_macro(T) \
187  inline \
188  odiststream& \
189  operator << (odiststream& s, const T& x) { \
190  s.os() << x; return s; \
191  }
192 
193 template <class T>
194 inline
197 {
198  if (s.nop()) return s;
199  s.os() << x;
200  return s;
201 }
206 define_sequential_odiststream_macro(long unsigned int)
212 #ifdef _RHEOLEF_HAVE_FLOAT128
214 #endif // _RHEOLEF_HAVE_FLOAT128
216 define_sequential_odiststream_raw_macro(std::ostream& (*x)(std::ostream&))
217 
218 class idiststream {
219 public:
220  typedef std::size_t size_type;
221 
222 // allocators/deallocators:
223 
224  idiststream();
225  idiststream (std::istream& is, const communicator& comm = communicator());
226  idiststream (std::string filename, std::string suffix = "",
227  const communicator& comm = communicator());
228  ~idiststream();
229 
230 
231  void open (std::string filename, std::string suffix = "",
232  const communicator& comm = communicator());
233  void close();
234 
235 
236  const communicator& comm() const { return _comm; }
237  bool good() const;
238  operator bool() const { return good(); }
239  static size_type io_proc();
240 
241 
242  std::istream& is();
243  bool nop();
244  bool do_load();
245 
246 protected:
247  std::istream* _ptr_is;
248  bool _use_alloc;
250 };
251 @endcode
252 inline
253 idiststream::idiststream()
254  : _ptr_is(0), _use_alloc(false), _comm()
255 {
256 }
257 inline
258 idiststream::idiststream (std::istream& is, const communicator& comm)
259  : _ptr_is(&is), _use_alloc(false), _comm(comm)
260 {
261 }
262 inline
263 idiststream::idiststream (std::string filename, std::string suffix, const communicator& comm)
264  : _ptr_is(0), _use_alloc(false), _comm()
265 {
266  open (filename, suffix, comm);
267 }
268 inline
269 std::istream&
270 idiststream::is()
271 {
272  check_macro (_ptr_is != 0, "try to use an uninitialized idiststream");
273  return *_ptr_is;
274 }
275 #ifndef _RHEOLEF_HAVE_MPI
276 inline bool idiststream::nop() { return false; }
277 inline bool idiststream::do_load() { return true; }
278 #else
279 inline bool idiststream::nop() { return size_type(_comm.rank()) != io_proc(); }
280 inline bool idiststream::do_load() { return size_type(_comm.rank()) == io_proc(); }
281 #endif //_RHEOLEF_HAVE_MPI
282 
283 // standard i/o:
284 #ifdef _RHEOLEF_HAVE_MPI
285 # define define_sequential_idiststream_macro(T) \
286 inline \
287 idiststream& \
288 operator >> (idiststream& s, T& x) \
289 { \
290  if (s.do_load()) { (s.is()) >> x; } \
291  mpi::broadcast (mpi::communicator(), x, s.io_proc()); \
292  return s; \
293 }
294 #else // _RHEOLEF_HAVE_MPI
295 # define define_sequential_idiststream_macro(T) \
296 inline \
297 idiststream& \
298 operator >> (idiststream& s, T& x) \
299 { \
300  (s.is()) >> x; \
301  return s; \
302 }
303 #endif // _RHEOLEF_HAVE_MPI
304 # define define_distributed_idiststream_macro(T) \
305 inline \
306 idiststream& \
307 operator >> (idiststream& s, T& x) \
308 { \
309  s.is() >> x; \
310  return s; \
311 }
312 
317 define_sequential_idiststream_macro(long unsigned int)
322 #ifdef _RHEOLEF_HAVE_FLOAT128
324 #endif // _RHEOLEF_HAVE_FLOAT128
325 
326 extern idiststream din;
327 extern odiststream dout;
330 
331 bool dis_scatch (idiststream& ips, const communicator& comm, std::string ch);
332 
333 inline
334 bool dis_scatch (idiststream& ips, std::string ch)
335 {
336  return dis_scatch (ips, ips.comm(), ch);
337 }
338 inline
339 idiststream&
340 operator>> (idiststream& ids, const catchmark& m)
341 {
342  if (ids.nop()) return ids;
343  ids.is() >> setmark(m.mark());
344  std::string label = "#" + m.mark();
345  if (!scatch(ids.is(),label)) {
346  bool verbose = iorheo::getverbose(ids.is());
347  if (verbose) warning_macro ("catchmark: label `"<< label <<"' not found on input");
348  }
349  return ids;
350 }
351 inline
354 {
355  if (ods.nop()) return ods;
356  ods.os() << setmark(m.mark())
357  << "#" << m.mark() << std::endl;
358  return ods;
359 }
360 int dis_system (const std::string& command, const communicator& comm = communicator());
361 bool dis_file_exists (const std::string& filename, const communicator& comm = communicator());
362 
363 
364 } // namespace rheolef
365 # endif // _RHEOLEF_DISTSTREAM_H
bool dis_scatch(idiststream &ips, const communicator &comm, std::string ch)
Definition: diststream.cc:13
idiststream, odiststream - distributed interface for large data streams
Definition: diststream.h:68
static size_type io_proc()
Definition: diststream.cc:43
#define define_sequential_odiststream_macro(T)
Definition: diststream.h:183
#define define_sequential_idiststream_macro(T)
Definition: diststream.h:295
irheostream, orheostream - large data streams
Definition: compiler.h:7
size_type rank() const
Definition: distributed.h:53
std::ostream * _ptr_os
Definition: diststream.h:106
#define warning_macro(message)
Definition: compiler.h:102
#define check_macro(ok_condition, message)
Definition: compiler.h:104
idiststream din(cin)
const communicator & comm() const
Definition: diststream.h:96
bool good() const
Definition: diststream.cc:154
const std::string & mark() const
Definition: catchmark.h:33
bool scatch(std::istream &in, const std::string &ch, bool full_match=true)
#define define_sequential_odiststream_raw_macro(arg)
Definition: diststream.h:177
void open(std::string filename, std::string suffix="", io::mode_type mode=io::out, const communicator &comm=communicator())
Definition: diststream.cc:103
odiststream dlog(clog)
Definition: diststream.h:328
catchmark - iostream manipulator
Definition: catchmark.h:30
communicator _comm
Definition: diststream.h:108
int dis_system(const std::string &command, const communicator &comm)
Definition: diststream.cc:170
bool dis_file_exists(const std::string &filename, const communicator &comm)
Definition: diststream.cc:189
std::istream & operator>>(std::istream &is, const catchmark &m)
Definition: catchmark.h:48
odiststream derr(cerr)
Definition: diststream.h:329
std::ostream & operator<<(std::ostream &os, const catchmark &m)
Definition: catchmark.h:59
std::size_t size_type
Definition: diststream.h:70
std::ostream & os()
Definition: diststream.h:166
odiststream dout(cout)
Definition: diststream.h:313