1 /*
2 This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3
4 Author: Uwe Schulzweida
5 Oliver Heidmann
6
7 */
8
9 #ifdef HAVE_CONFIG_H
10 #include "config.h"
11 #endif
12
13 #include <cdi.h>
14
15 #include "pipeStream.h"
16 #include "cdo_output.h"
17
18 #ifdef HAVE_LIBPTHREAD
19
20 #include "pthread_debug.h"
21
PipeStream(int p_processID)22 PipeStream::PipeStream(int p_processID)
23 {
24 ispipe = true;
25 m_pipe = std::make_shared<pipe_t>();
26 m_pipe->pipe_set_name(p_processID, m_cdoStreamID);
27 m_name = m_pipe->name;
28 }
29
30 int
open_read()31 PipeStream::open_read()
32 {
33 rthreadID = pthread_self();
34 isopen = true;
35 return m_cdoStreamID;
36 }
37
38 int
open_write(int filetype)39 PipeStream::open_write(int filetype)
40 {
41 Debug(PIPE_STREAM, "pipe %s", m_pipe->name);
42
43 wthreadID = pthread_self();
44 m_filetype = filetype;
45 isopen = true;
46
47 return m_cdoStreamID;
48 }
49
50 int
open_append()51 PipeStream::open_append()
52 {
53 cdo_warning("Operator does not support pipes!");
54 return -1;
55 }
56
57 int
inq_vlist()58 PipeStream::inq_vlist()
59 {
60 // m_vlist is changed when the whlist was successfully defined by another pipe!!
61 auto vlistID = m_pipe->pipe_inq_vlist(m_vlistID);
62 if (vlistID == -1) cdo_abort("Couldn't read data from input stream %s!", m_name.c_str());
63 return vlistID;
64 }
65
66 void
def_vlist(int p_vlistID)67 PipeStream::def_vlist(int p_vlistID)
68 {
69 Debug(PIPE_STREAM, "%s pstreamID %d", m_pipe->name, m_cdoStreamID);
70 auto vlistIDcp = vlistDuplicate(p_vlistID);
71 m_pipe->pipe_def_vlist(m_vlistID, vlistIDcp);
72 }
73
74 void
inq_record(int * varID,int * levelID)75 PipeStream::inq_record(int *varID, int *levelID)
76 {
77 m_pipe->pipe_inq_record(varID, levelID);
78 }
79
80 void
defRecord(int varID,int levelID)81 PipeStream::defRecord(int varID, int levelID)
82 {
83 m_pipe->pipe_def_record(varID, levelID);
84 }
85
86 void
read_record(float * p_data,size_t * p_nmiss)87 PipeStream::read_record(float *p_data, size_t *p_nmiss)
88 {
89 m_nvals += m_pipe->pipe_read_record(m_vlistID, p_data, p_nmiss);
90 }
91
92 void
read_record(double * p_data,size_t * p_nmiss)93 PipeStream::read_record(double *p_data, size_t *p_nmiss)
94 {
95 m_nvals += m_pipe->pipe_read_record(m_vlistID, p_data, p_nmiss);
96 }
97
98 void
read_record(Field * p_field,size_t * p_nmiss)99 PipeStream::read_record(Field *p_field, size_t *p_nmiss)
100 {
101 m_nvals += m_pipe->pipe_read_record(m_vlistID, p_field, p_nmiss);
102 }
103
104 void
write_record(float * p_data,size_t p_nmiss)105 PipeStream::write_record(float *p_data, size_t p_nmiss)
106 {
107 m_pipe->pipe_write_record(p_data, p_nmiss);
108 }
109
110 void
write_record(double * p_data,size_t p_nmiss)111 PipeStream::write_record(double *p_data, size_t p_nmiss)
112 {
113 m_pipe->pipe_write_record(p_data, p_nmiss);
114 }
115
116 void
write_record(Field * p_field,size_t p_nmiss)117 PipeStream::write_record(Field *p_field, size_t p_nmiss)
118 {
119 m_pipe->pipe_write_record(p_field, p_nmiss);
120 }
121
122 void
copyRecord(CdoStreamID p_destination)123 PipeStream::copyRecord(CdoStreamID p_destination)
124 {
125 (void) p_destination;
126 // Not implemented for pipes
127 // Cdi handles this. And in cdo we would have to decompress and recompress for copy operations
128 // which is very resource intensive (also lossy compression)
129 cdo_warning("Copy Record not possible with piped streams");
130 }
131
132 int
inq_timestep(int p_tsID)133 PipeStream::inq_timestep(int p_tsID)
134 {
135 auto nrecs = m_pipe->pipe_inq_timestep(p_tsID);
136 m_tsID = p_tsID;
137 Debug(PIPE_STREAM, "PipeStream: Current TsID: %d, nrecs: %d", m_tsID, nrecs);
138 return nrecs;
139 }
140
141 void
def_timestep(int p_tsID)142 PipeStream::def_timestep(int p_tsID)
143 {
144 Debug(PIPE_STREAM, "%s pstreamID %d", m_pipe->name, m_cdoStreamID);
145 m_pipe->pipe_def_timestep(m_vlistID, p_tsID);
146 }
147
148 int
inqFileType()149 PipeStream::inqFileType()
150 {
151 return m_filetype;
152 }
153
154 int
inqByteorder()155 PipeStream::inqByteorder()
156 {
157 return m_filetype;
158 }
159
160 void
waitForPipe()161 PipeStream::waitForPipe()
162 {
163 m_pipe->close();
164 std::unique_lock<std::mutex> locked_mutex(m_pipe->m_mutex);
165 while (isopen)
166 {
167 Debug(PIPE_STREAM, "wait of read close");
168 m_pipe->isClosed_cond.wait(locked_mutex);
169 }
170 }
171
172 void
close()173 PipeStream::close()
174 {
175 auto threadID = pthread_self();
176
177 Debug(PIPE_STREAM,"streamID: %d thID: %ld rthID: %ld wthID: %ld",get_id(), threadID, rthreadID, wthreadID);
178
179 if (pthread_equal(threadID, rthreadID))
180 {
181 isopen = false;
182 m_pipe->close();
183 pthread_join(wthreadID, nullptr);
184 }
185 else if (pthread_equal(threadID, wthreadID))
186 {
187 waitForPipe();
188 }
189 else
190 {
191 cdo_abort("Internal problem! Close pipe %s", m_name);
192 }
193 }
194
195 size_t
getNvals()196 PipeStream::getNvals()
197 {
198 return m_nvals;
199 }
200
201 #endif
202