1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: Uwe Schulzweida
5           Oliver Heidmann
6 
7 */
8 
9 #ifdef HAVE_CONFIG_H
10 #include "config.h"
11 #endif
12 
13 #include <cdi.h>
14 
15 #include "pipeStream.h"
16 #include "cdo_output.h"
17 
18 #ifdef HAVE_LIBPTHREAD
19 
20 #include "pthread_debug.h"
21 
PipeStream(int p_processID)22 PipeStream::PipeStream(int p_processID)
23 {
24   ispipe = true;
25   m_pipe = std::make_shared<pipe_t>();
26   m_pipe->pipe_set_name(p_processID, m_cdoStreamID);
27   m_name = m_pipe->name;
28 }
29 
30 int
open_read()31 PipeStream::open_read()
32 {
33   rthreadID = pthread_self();
34   isopen = true;
35   return m_cdoStreamID;
36 }
37 
38 int
open_write(int filetype)39 PipeStream::open_write(int filetype)
40 {
41   Debug(PIPE_STREAM, "pipe %s", m_pipe->name);
42 
43   wthreadID = pthread_self();
44   m_filetype = filetype;
45   isopen = true;
46 
47   return m_cdoStreamID;
48 }
49 
50 int
open_append()51 PipeStream::open_append()
52 {
53   cdo_warning("Operator does not support pipes!");
54   return -1;
55 }
56 
57 int
inq_vlist()58 PipeStream::inq_vlist()
59 {
60   // m_vlist is changed when the whlist was successfully defined by another pipe!!
61   auto vlistID = m_pipe->pipe_inq_vlist(m_vlistID);
62   if (vlistID == -1) cdo_abort("Couldn't read data from input stream %s!", m_name.c_str());
63   return vlistID;
64 }
65 
66 void
def_vlist(int p_vlistID)67 PipeStream::def_vlist(int p_vlistID)
68 {
69   Debug(PIPE_STREAM, "%s pstreamID %d", m_pipe->name, m_cdoStreamID);
70   auto vlistIDcp = vlistDuplicate(p_vlistID);
71   m_pipe->pipe_def_vlist(m_vlistID, vlistIDcp);
72 }
73 
74 void
inq_record(int * varID,int * levelID)75 PipeStream::inq_record(int *varID, int *levelID)
76 {
77   m_pipe->pipe_inq_record(varID, levelID);
78 }
79 
80 void
defRecord(int varID,int levelID)81 PipeStream::defRecord(int varID, int levelID)
82 {
83   m_pipe->pipe_def_record(varID, levelID);
84 }
85 
86 void
read_record(float * p_data,size_t * p_nmiss)87 PipeStream::read_record(float *p_data, size_t *p_nmiss)
88 {
89   m_nvals += m_pipe->pipe_read_record(m_vlistID, p_data, p_nmiss);
90 }
91 
92 void
read_record(double * p_data,size_t * p_nmiss)93 PipeStream::read_record(double *p_data, size_t *p_nmiss)
94 {
95   m_nvals += m_pipe->pipe_read_record(m_vlistID, p_data, p_nmiss);
96 }
97 
98 void
read_record(Field * p_field,size_t * p_nmiss)99 PipeStream::read_record(Field *p_field, size_t *p_nmiss)
100 {
101   m_nvals += m_pipe->pipe_read_record(m_vlistID, p_field, p_nmiss);
102 }
103 
104 void
write_record(float * p_data,size_t p_nmiss)105 PipeStream::write_record(float *p_data, size_t p_nmiss)
106 {
107   m_pipe->pipe_write_record(p_data, p_nmiss);
108 }
109 
110 void
write_record(double * p_data,size_t p_nmiss)111 PipeStream::write_record(double *p_data, size_t p_nmiss)
112 {
113   m_pipe->pipe_write_record(p_data, p_nmiss);
114 }
115 
116 void
write_record(Field * p_field,size_t p_nmiss)117 PipeStream::write_record(Field *p_field, size_t p_nmiss)
118 {
119   m_pipe->pipe_write_record(p_field, p_nmiss);
120 }
121 
122 void
copyRecord(CdoStreamID p_destination)123 PipeStream::copyRecord(CdoStreamID p_destination)
124 {
125   (void) p_destination;
126   // Not implemented for pipes
127   // Cdi handles this. And in cdo we would have to decompress and recompress for copy operations
128   // which is very resource intensive (also lossy compression)
129   cdo_warning("Copy Record not possible with piped streams");
130 }
131 
132 int
inq_timestep(int p_tsID)133 PipeStream::inq_timestep(int p_tsID)
134 {
135   auto nrecs = m_pipe->pipe_inq_timestep(p_tsID);
136   m_tsID = p_tsID;
137   Debug(PIPE_STREAM, "PipeStream: Current TsID: %d,  nrecs: %d", m_tsID, nrecs);
138   return nrecs;
139 }
140 
141 void
def_timestep(int p_tsID)142 PipeStream::def_timestep(int p_tsID)
143 {
144   Debug(PIPE_STREAM, "%s pstreamID %d", m_pipe->name, m_cdoStreamID);
145   m_pipe->pipe_def_timestep(m_vlistID, p_tsID);
146 }
147 
148 int
inqFileType()149 PipeStream::inqFileType()
150 {
151   return m_filetype;
152 }
153 
154 int
inqByteorder()155 PipeStream::inqByteorder()
156 {
157   return m_filetype;
158 }
159 
160 void
waitForPipe()161 PipeStream::waitForPipe()
162 {
163   m_pipe->close();
164   std::unique_lock<std::mutex> locked_mutex(m_pipe->m_mutex);
165   while (isopen)
166     {
167       Debug(PIPE_STREAM, "wait of read close");
168       m_pipe->isClosed_cond.wait(locked_mutex);
169     }
170 }
171 
172 void
close()173 PipeStream::close()
174 {
175   auto threadID = pthread_self();
176 
177   Debug(PIPE_STREAM,"streamID: %d thID: %ld rthID: %ld wthID: %ld",get_id(), threadID, rthreadID, wthreadID);
178 
179   if (pthread_equal(threadID, rthreadID))
180     {
181       isopen = false;
182       m_pipe->close();
183       pthread_join(wthreadID, nullptr);
184     }
185   else if (pthread_equal(threadID, wthreadID))
186     {
187       waitForPipe();
188     }
189   else
190     {
191       cdo_abort("Internal problem! Close pipe %s", m_name);
192     }
193 }
194 
195 size_t
getNvals()196 PipeStream::getNvals()
197 {
198   return m_nvals;
199 }
200 
201 #endif
202