1 /*
2  *
3  *  C++ Portable Types Library (PTypes)
4  *  Version 2.1.1  Released 27-Jun-2007
5  *
6  *  Copyright (C) 2001-2007 Hovik Melikyan
7  *
8  *  http://www.melikyan.com/ptypes/
9  *
10  */
11 
12 #include "ptypes.h"
13 #include "pasync.h"
14 #include "pstreams.h"
15 
16 #ifdef WIN32
17 #  include <windows.h>
18 #else
19 #  include <unistd.h>
20 #endif
21 
22 
23 PTYPES_BEGIN
24 
25 
26 //
27 // internal thread class for running units asynchronously
28 //
29 
30 class unit_thread: public thread
31 {
32 protected:
33     unit* target;
34     virtual void execute();
35 public:
36     unit_thread(unit* itarget);
37     virtual ~unit_thread();
38 };
39 
40 
unit_thread(unit * itarget)41 unit_thread::unit_thread(unit* itarget)
42     : thread(false), target(itarget)
43 {
44     start();
45 }
46 
47 
48 
~unit_thread()49 unit_thread::~unit_thread()
50 {
51     waitfor();
52 }
53 
54 
execute()55 void unit_thread::execute()
56 {
57     target->do_main();
58 }
59 
60 
61 //
62 // unit class
63 //
64 
unit()65 unit::unit()
66     : component(), pipe_next(nil), main_thread(nil),
67       running(0), uin(&pin), uout(&pout)
68 {
69 }
70 
71 
~unit()72 unit::~unit()
73 {
74     delete tpexchange<unit_thread>(&main_thread, nil);
75 }
76 
77 
classid()78 int unit::classid()
79 {
80     return CLASS_UNIT;
81 }
82 
83 
main()84 void unit::main()
85 {
86 }
87 
88 
cleanup()89 void unit::cleanup()
90 {
91 }
92 
93 
do_main()94 void unit::do_main()
95 {
96     try
97     {
98         if (!uout->get_active())
99             uout->open();
100         if (!uin->get_active())
101             uin->open();
102         main();
103         if (uout->get_active())
104             uout->flush();
105     }
106     catch(exception* e)
107     {
108         perr.putf("Error: %s\n", pconst(e->get_message()));
109         delete e;
110     }
111 
112     try
113     {
114         cleanup();
115     }
116     catch(exception* e)
117     {
118         perr.putf("Error: %s\n", pconst(e->get_message()));
119         delete e;
120     }
121 
122     if (pipe_next != nil)
123         uout->close();
124 }
125 
126 
connect(unit * next)127 void unit::connect(unit* next)
128 {
129     waitfor();
130     pipe_next = next;
131     infile* in = new infile();
132     outfile* out = new outfile();
133     next->uin = in;
134     uout = out;
135     in->pipe(*out);
136 }
137 
138 
waitfor()139 void unit::waitfor()
140 {
141     if (running == 0)
142         return;
143     delete tpexchange<unit_thread>(&main_thread, nil);
144     unit* next = tpexchange<unit>(&pipe_next, nil);
145     if (next != nil)
146     {
147         next->waitfor();
148         next->uin = &pin;
149     }
150     uout = &pout;
151     running = 0;
152 }
153 
154 
run(bool async)155 void unit::run(bool async)
156 {
157     if (pexchange(&running, 1) != 0)
158         return;
159 
160     if (main_thread != nil)
161         fatal(CRIT_FIRST + 60, "Unit already running");
162 
163     if (pipe_next != nil)
164         pipe_next->run(true);
165 
166     if (async)
167         main_thread = new unit_thread(this);
168     else
169     {
170         do_main();
171         waitfor();
172     }
173 }
174 
175 
176 PTYPES_END
177