1 /*
2 *
3 * C++ Portable Types Library (PTypes)
4 * Version 2.1.1 Released 27-Jun-2007
5 *
6 * Copyright (C) 2001-2007 Hovik Melikyan
7 *
8 * http://www.melikyan.com/ptypes/
9 *
10 */
11
12 #include "ptypes.h"
13 #include "pasync.h"
14 #include "pstreams.h"
15
16 #ifdef WIN32
17 # include <windows.h>
18 #else
19 # include <unistd.h>
20 #endif
21
22
23 PTYPES_BEGIN
24
25
26 //
27 // internal thread class for running units asynchronously
28 //
29
30 class unit_thread: public thread
31 {
32 protected:
33 unit* target;
34 virtual void execute();
35 public:
36 unit_thread(unit* itarget);
37 virtual ~unit_thread();
38 };
39
40
unit_thread(unit * itarget)41 unit_thread::unit_thread(unit* itarget)
42 : thread(false), target(itarget)
43 {
44 start();
45 }
46
47
48
~unit_thread()49 unit_thread::~unit_thread()
50 {
51 waitfor();
52 }
53
54
execute()55 void unit_thread::execute()
56 {
57 target->do_main();
58 }
59
60
61 //
62 // unit class
63 //
64
unit()65 unit::unit()
66 : component(), pipe_next(nil), main_thread(nil),
67 running(0), uin(&pin), uout(&pout)
68 {
69 }
70
71
~unit()72 unit::~unit()
73 {
74 delete tpexchange<unit_thread>(&main_thread, nil);
75 }
76
77
classid()78 int unit::classid()
79 {
80 return CLASS_UNIT;
81 }
82
83
main()84 void unit::main()
85 {
86 }
87
88
cleanup()89 void unit::cleanup()
90 {
91 }
92
93
do_main()94 void unit::do_main()
95 {
96 try
97 {
98 if (!uout->get_active())
99 uout->open();
100 if (!uin->get_active())
101 uin->open();
102 main();
103 if (uout->get_active())
104 uout->flush();
105 }
106 catch(exception* e)
107 {
108 perr.putf("Error: %s\n", pconst(e->get_message()));
109 delete e;
110 }
111
112 try
113 {
114 cleanup();
115 }
116 catch(exception* e)
117 {
118 perr.putf("Error: %s\n", pconst(e->get_message()));
119 delete e;
120 }
121
122 if (pipe_next != nil)
123 uout->close();
124 }
125
126
connect(unit * next)127 void unit::connect(unit* next)
128 {
129 waitfor();
130 pipe_next = next;
131 infile* in = new infile();
132 outfile* out = new outfile();
133 next->uin = in;
134 uout = out;
135 in->pipe(*out);
136 }
137
138
waitfor()139 void unit::waitfor()
140 {
141 if (running == 0)
142 return;
143 delete tpexchange<unit_thread>(&main_thread, nil);
144 unit* next = tpexchange<unit>(&pipe_next, nil);
145 if (next != nil)
146 {
147 next->waitfor();
148 next->uin = &pin;
149 }
150 uout = &pout;
151 running = 0;
152 }
153
154
run(bool async)155 void unit::run(bool async)
156 {
157 if (pexchange(&running, 1) != 0)
158 return;
159
160 if (main_thread != nil)
161 fatal(CRIT_FIRST + 60, "Unit already running");
162
163 if (pipe_next != nil)
164 pipe_next->run(true);
165
166 if (async)
167 main_thread = new unit_thread(this);
168 else
169 {
170 do_main();
171 waitfor();
172 }
173 }
174
175
176 PTYPES_END
177