1 /*
2   This file is part of CDO. CDO is a collection of Operators to manipulate and analyse Climate model Data.
3 
4   Author: Uwe Schulzweida
5 
6 */
7 
8 #include <cdi.h>
9 
10 #include "cdi_lockedIO.h"
11 #include "cdo_options.h"
12 #include "cdo_output.h"
13 #include "cthread_debug.h"
14 
15 #include <mutex>
16 
17 static std::mutex streamOpenReadMutex;
18 static std::mutex streamMutex;
19 
20 int
stream_open_read_locked(const char * const p_filename)21 stream_open_read_locked(const char *const p_filename)
22 {
23   open_lock();
24   const auto streamID = streamOpenRead(p_filename);
25   open_unlock();
26   if (streamID < 0) cdi_open_error(streamID, "Open failed on >%s<", p_filename);
27 
28   return streamID;
29 }
30 
31 void
stream_close_locked(const int p_fileID)32 stream_close_locked(const int p_fileID)
33 {
34   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
35   streamClose(p_fileID);
36   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
37 }
38 
39 void
stream_inq_rec_locked(const int p_fileID,int * const p_varID,int * const p_levelID)40 stream_inq_rec_locked(const int p_fileID, int *const p_varID, int *const p_levelID)
41 {
42   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
43   streamInqRecord(p_fileID, p_varID, p_levelID);
44   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
45 }
46 
47 void
stream_def_rec_locked(const int p_fileID,const int p_varID,const int p_levelID)48 stream_def_rec_locked(const int p_fileID, const int p_varID, const int p_levelID)
49 {
50   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
51   streamDefRecord(p_fileID, p_varID, p_levelID);
52   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
53 }
54 
55 void
stream_readrecord_float_locked(const int p_fileID,float * const p_data,size_t * const p_nmiss)56 stream_readrecord_float_locked(const int p_fileID, float *const p_data, size_t *const p_nmiss)
57 {
58   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
59   streamReadRecordF(p_fileID, p_data, p_nmiss);
60   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
61 }
62 
63 void
stream_readrecord_double_locked(const int p_fileID,double * const p_data,size_t * const p_nmiss)64 stream_readrecord_double_locked(const int p_fileID, double *const p_data, size_t *const p_nmiss)
65 {
66   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
67   streamReadRecord(p_fileID, p_data, p_nmiss);
68   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
69 }
70 
71 void
stream_def_vlist_locked(const int p_fileID,const int p_vlistID)72 stream_def_vlist_locked(const int p_fileID, const int p_vlistID)
73 {
74   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
75   streamDefVlist(p_fileID, p_vlistID);
76   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
77 }
78 
79 int
stream_inq_vlist_locked(const int p_fileID)80 stream_inq_vlist_locked(const int p_fileID)
81 {
82   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
83   const auto vlistID = streamInqVlist(p_fileID);
84   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
85 
86   return vlistID;
87 }
88 
89 void
stream_write_record_double_locked(const int p_fileID,const double * const p_data,const size_t p_nmiss)90 stream_write_record_double_locked(const int p_fileID, const double *const p_data, const size_t p_nmiss)
91 {
92   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
93   streamWriteRecord(p_fileID, p_data, p_nmiss);
94   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
95 }
96 
97 void
stream_write_record_float_locked(const int p_fileID,const float * const p_data,const size_t p_nmiss)98 stream_write_record_float_locked(const int p_fileID, const float *const p_data, const size_t p_nmiss)
99 {
100   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
101   streamWriteRecordF(p_fileID, p_data, p_nmiss);
102   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
103 }
104 
105 int
stream_inq_time_step_locked(const int p_fileID,const int p_tsID)106 stream_inq_time_step_locked(const int p_fileID, const int p_tsID)
107 {
108   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
109   const auto nrecs = streamInqTimestep(p_fileID, p_tsID);
110   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
111 
112   return nrecs;
113 }
114 
115 int
stream_def_time_step_locked(const int p_fileID,const int p_tsID)116 stream_def_time_step_locked(const int p_fileID, const int p_tsID)
117 {
118   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
119   const auto success = streamDefTimestep(p_fileID, p_tsID);
120   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
121   return success;
122 }
123 
124 int
stream_copy_record_locked(const int p_fileID,const int p_targetFileID)125 stream_copy_record_locked(const int p_fileID, const int p_targetFileID)
126 {
127   if (Threading::cdoLockIO) cthread_mutex_lock(streamMutex);
128   streamCopyRecord(p_fileID, p_targetFileID);
129   if (Threading::cdoLockIO) cthread_mutex_unlock(streamMutex);
130   return p_targetFileID;
131 }
132 
133 void
vlist_copy_flag_locked(const int p_vlistID2,const int p_vlistID1)134 vlist_copy_flag_locked(const int p_vlistID2, const int p_vlistID1)
135 {
136   cthread_mutex_lock(streamMutex);
137   vlistCopyFlag(p_vlistID2, p_vlistID1);
138   cthread_mutex_unlock(streamMutex);
139 }
140 
141 void
open_lock(void)142 open_lock(void)
143 {
144   cthread_mutex_lock(Threading::cdoLockIO ? streamMutex : streamOpenReadMutex);
145 }
146 
147 void
open_unlock(void)148 open_unlock(void)
149 {
150   cthread_mutex_unlock(Threading::cdoLockIO ? streamMutex : streamOpenReadMutex);
151 }
152 
153 void
cdo_vlist_copy_flag(const int vlistID2,const int vlistID1)154 cdo_vlist_copy_flag(const int vlistID2, const int vlistID1)
155 {
156   vlist_copy_flag_locked(vlistID2, vlistID1);
157 }
158