1 // -*- c++ -*-
2 //------------------------------------------------------------------------------
3 // Conn.cpp
4 //------------------------------------------------------------------------------
5 // $Id: Conn.cpp,v 1.5 2006/07/20 02:30:55 vlg Exp $
6 //------------------------------------------------------------------------------
7 // Copyright (c) 2003 by Vladislav Grinchenko
8 //
9 // This program is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU General Public License
11 // as published by the Free Software Foundation; either version
12 // 2 of the License, or (at your option) any later version.
13 //------------------------------------------------------------------------------
14 // Created: Thu Apr 17 23:22:39 EDT 2003
15 //------------------------------------------------------------------------------
16
17 #include "Conn.h"
18 #include "LogServer.h"
19 #include "MonitorConn.h"
20
21 static const char sep[]="-------------------------------------------------";
22
23 int
24 Conn::
open()25 open ()
26 {
27 trace_with_mask("Conn::open",LSVRTRACE);
28
29 REACTOR->registerIOHandler (this, get_stream ().getHandler (),
30 ASSA::READ_EVENT);
31 DL((LSVR,"+--------------------------------+\n"));
32 DL((LSVR,"| Accepted new client connection |\n"));
33 DL((LSVR,"+--------------------------------+\n"));
34 return 0;
35 }
36
37 int
38 Conn::
handle_close(int)39 handle_close (int /* fd */)
40 {
41 trace_with_mask("Conn::handle_close", LSVRTRACE);
42
43 DL((LSVR,"+---------------------+\n"));
44 DL((LSVR,"| Client disconnected |\n"));
45 DL((LSVR,"+---------------------+\n"));
46
47 /** Disconnect from observers
48 */
49 if (m_observers.size ()) {
50 ASSA::Repository<MonitorConn>::const_iterator cit;
51 cit = m_observers.begin ();
52 while (cit != m_observers.end ()) {
53 (*cit++)->notify (NULL);
54 }
55 m_observers.clear ();
56 }
57
58 REPO->erase (this);
59 delete (this);
60 return 0;
61 }
62
63 int
64 Conn::
handle_read(int fd_)65 handle_read (int fd_)
66 {
67 trace_with_mask ("Conn::handle_read", LSVRTRACE);
68 if (get_stream ().getHandler () != fd_) {
69 return (-1);
70 }
71
72 if (m_wstate == wait_for_header) {
73 int preamble = 0;
74 m_msg_type = m_msg_size = 0;
75
76 get_stream () >> preamble >> m_msg_type >> m_msg_size;
77
78 if (preamble != 1234567890) {
79 DL((LSVRERROR,"Message stream is out of sync - Abort!\n"));
80 return -1;
81 }
82 DL((LSVR,"=> Detected Header\n"));
83 DL((LSVR,"rcvd: Preamble = %d\n", preamble));
84 DL((LSVR,"rcvd: Type = %d\n", m_msg_type));
85 DL((LSVR,"rcvd: Size = %d\n", m_msg_size));
86
87 switch (m_msg_type)
88 {
89 case SIGN_ON: m_wstate = wait_for_signon; break;
90 case SIGN_OFF: m_wstate = wait_for_signoff; break;
91 case LOG_MSG: m_wstate = wait_for_logmsg; break;
92 }
93 }
94
95 if (m_wstate == wait_for_signon) {
96 m_maxsize = 0;
97 DL((LSVR,"=> Incoming SIGN_ON message\n"));
98 Assure_exit (m_state == closed);
99 get_stream () >> m_maxsize >> m_app_name >> m_logfname;
100 DL((LSVR,"rcvd: MaxLogSize = %d, AppName = \"%s\"\n",
101 m_maxsize, m_app_name.c_str ()));
102 DL((LSVR,"rcvd: LogFileName = \"%s\"\n", m_logfname.c_str ()));
103
104 std::string path = LOGSERVER->get_log_dir () + "/" + m_logfname;
105
106 if (LOGSERVER->recycle ()) {
107 ::unlink (path.c_str ());
108 }
109 m_sink.open (path.c_str (), std::ios::out | std::ios::app);
110 if (!m_sink) {
111 DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", path.c_str ()));
112 return -1;
113 }
114 m_state = opened;
115 m_wstate = wait_for_header;
116 REPO->push_back (this);
117 }
118 else if (m_wstate == wait_for_signoff) {
119 DL((LSVR,"=> Incoming SIGN_OFF message\n"));
120 Assure_exit (m_state == opened);
121 m_sink << std::flush;
122 m_sink.close ();
123 m_state = closed;
124 return -1;
125 }
126 else if (m_wstate == wait_for_logmsg) {
127 DL((LSVR,"=> Incoming LOG_MSG message\n"));
128 Assure_exit (m_state == opened);
129 std::string msg;
130 if (get_stream () >> msg) {
131 if (msg.length () != 0) {
132 DL((LSVR,"rcvs message:\n%s\n%s%s\n", sep, msg.c_str (), sep));
133 m_bytecount += msg.length ();
134 if (m_bytecount > m_maxsize) {
135 shift_logfile ();
136 }
137 }
138 else {
139 DL((LSVR,"rcvs EMPTY message!\n"));
140 Assure_exit (false);
141 }
142 m_sink << msg << std::flush;
143 if (m_observers.size ()) {
144 ASSA::Repository<MonitorConn>::const_iterator cit;
145 cit = m_observers.begin ();
146 while (cit != m_observers.end ()) {
147 (*cit++)->notify (msg.c_str ());
148 }
149 }
150 }
151 else {
152 DL((LSVRERROR,"Peer dropped connection!\n"));
153 m_sink << std::flush;
154 m_sink.close ();
155 DL((LSVR,"m_bytecount = %d\n", m_bytecount));
156 DL((LSVR,"m_maxsize = %d\n", m_maxsize));
157 }
158 m_wstate = wait_for_header;
159 }
160
161 return BYTES_LEFT_IN_SOCKBUF(get_stream ());
162 }
163
164 void
165 Conn::
shift_logfile()166 shift_logfile ()
167 {
168 trace_with_mask("Conn::shift_logfile", LSVRTRACE);
169
170 m_sink << std::flush;
171 m_sink.close ();
172 m_bytecount = 0;
173 std::string oldfile = m_logfname + ".0";
174 ::unlink (oldfile.c_str ());
175 ::rename (m_logfname.c_str (), oldfile.c_str ());
176
177 m_sink.open (m_logfname.c_str (), std::ios::out | std::ios::app);
178 if (!m_sink) {
179 DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", m_logfname.c_str ()));
180 }
181 }
182