1 // -*- c++ -*-
2 //------------------------------------------------------------------------------
3 //                               Conn.cpp
4 //------------------------------------------------------------------------------
5 // $Id: Conn.cpp,v 1.5 2006/07/20 02:30:55 vlg Exp $
6 //------------------------------------------------------------------------------
7 //  Copyright (c) 2003 by Vladislav Grinchenko
8 //
9 //  This program is free software; you can redistribute it and/or
10 //  modify it under the terms of the GNU General Public License
11 //  as published by the Free Software Foundation; either version
12 //  2 of the License, or (at your option) any later version.
13 //------------------------------------------------------------------------------
14 // Created: Thu Apr 17 23:22:39 EDT 2003
15 //------------------------------------------------------------------------------
16 
17 #include "Conn.h"
18 #include "LogServer.h"
19 #include "MonitorConn.h"
20 
21 static const char sep[]="-------------------------------------------------";
22 
23 int
24 Conn::
open()25 open ()
26 {
27 	trace_with_mask("Conn::open",LSVRTRACE);
28 
29 	REACTOR->registerIOHandler (this, get_stream ().getHandler (),
30 								ASSA::READ_EVENT);
31 	DL((LSVR,"+--------------------------------+\n"));
32 	DL((LSVR,"| Accepted new client connection |\n"));
33 	DL((LSVR,"+--------------------------------+\n"));
34 	return 0;
35 }
36 
37 int
38 Conn::
handle_close(int)39 handle_close (int /* fd */)
40 {
41 	trace_with_mask("Conn::handle_close", LSVRTRACE);
42 
43 	DL((LSVR,"+---------------------+\n"));
44 	DL((LSVR,"| Client disconnected |\n"));
45 	DL((LSVR,"+---------------------+\n"));
46 
47 	/** Disconnect from observers
48 	 */
49 	if (m_observers.size ()) {
50 		ASSA::Repository<MonitorConn>::const_iterator cit;
51 		cit = m_observers.begin ();
52 		while (cit != m_observers.end ()) {
53 			(*cit++)->notify (NULL);
54 		}
55 		m_observers.clear ();
56 	}
57 
58 	REPO->erase (this);
59 	delete (this);
60 	return 0;
61 }
62 
63 int
64 Conn::
handle_read(int fd_)65 handle_read (int fd_)
66 {
67 	trace_with_mask ("Conn::handle_read", LSVRTRACE);
68 	if (get_stream ().getHandler () != fd_) {
69 		return (-1);
70 	}
71 
72 	if (m_wstate == wait_for_header) {
73 		int preamble = 0;
74 		m_msg_type = m_msg_size = 0;
75 
76 		get_stream () >> preamble >> m_msg_type >> m_msg_size;
77 
78 		if (preamble != 1234567890) {
79 			DL((LSVRERROR,"Message stream is out of sync - Abort!\n"));
80 			return -1;
81 		}
82 		DL((LSVR,"=> Detected Header\n"));
83 		DL((LSVR,"rcvd: Preamble = %d\n", preamble));
84 		DL((LSVR,"rcvd: Type     = %d\n", m_msg_type));
85 		DL((LSVR,"rcvd: Size     = %d\n", m_msg_size));
86 
87 		switch (m_msg_type)
88 		{
89 		case SIGN_ON:  m_wstate = wait_for_signon;  break;
90 		case SIGN_OFF: m_wstate = wait_for_signoff; break;
91 		case LOG_MSG:  m_wstate = wait_for_logmsg;  break;
92 		}
93 	}
94 
95 	if (m_wstate == wait_for_signon) {
96 		m_maxsize = 0;
97 		DL((LSVR,"=> Incoming SIGN_ON message\n"));
98 		Assure_exit (m_state == closed);
99 		get_stream () >> m_maxsize >> m_app_name >> m_logfname;
100 		DL((LSVR,"rcvd: MaxLogSize = %d, AppName = \"%s\"\n",
101 			m_maxsize, m_app_name.c_str ()));
102 		DL((LSVR,"rcvd: LogFileName = \"%s\"\n", m_logfname.c_str ()));
103 
104 		std::string path = LOGSERVER->get_log_dir () + "/" + m_logfname;
105 
106 		if (LOGSERVER->recycle ()) {
107 			::unlink (path.c_str ());
108 		}
109 		m_sink.open (path.c_str (), std::ios::out | std::ios::app);
110 		if (!m_sink) {
111 			DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", path.c_str ()));
112 			return -1;
113 		}
114 		m_state = opened;
115 		m_wstate = wait_for_header;
116 		REPO->push_back (this);
117 	}
118 	else if (m_wstate == wait_for_signoff) {
119 		DL((LSVR,"=> Incoming SIGN_OFF message\n"));
120 		Assure_exit (m_state == opened);
121 		m_sink << std::flush;
122 		m_sink.close ();
123 		m_state = closed;
124 		return -1;
125 	}
126 	else if (m_wstate == wait_for_logmsg) {
127 		DL((LSVR,"=> Incoming LOG_MSG message\n"));
128 		Assure_exit (m_state == opened);
129 		std::string msg;
130 		if (get_stream () >> msg) {
131 			if (msg.length () != 0) {
132 				DL((LSVR,"rcvs message:\n%s\n%s%s\n", sep, msg.c_str (), sep));
133 				m_bytecount += msg.length ();
134 				if (m_bytecount > m_maxsize) {
135 					shift_logfile ();
136 				}
137 			}
138 			else {
139 				DL((LSVR,"rcvs EMPTY message!\n"));
140 				Assure_exit (false);
141 			}
142 			m_sink << msg << std::flush;
143 			if (m_observers.size ()) {
144 				ASSA::Repository<MonitorConn>::const_iterator cit;
145 				cit = m_observers.begin ();
146 				while (cit != m_observers.end ()) {
147 					(*cit++)->notify (msg.c_str ());
148 				}
149 			}
150 		}
151 		else {
152 			DL((LSVRERROR,"Peer dropped connection!\n"));
153 			m_sink << std::flush;
154 			m_sink.close ();
155 			DL((LSVR,"m_bytecount = %d\n", m_bytecount));
156 			DL((LSVR,"m_maxsize   = %d\n", m_maxsize));
157 		}
158 		m_wstate = wait_for_header;
159 	}
160 
161 	return BYTES_LEFT_IN_SOCKBUF(get_stream ());
162 }
163 
164 void
165 Conn::
shift_logfile()166 shift_logfile ()
167 {
168 	trace_with_mask("Conn::shift_logfile", LSVRTRACE);
169 
170 	m_sink << std::flush;
171 	m_sink.close ();
172 	m_bytecount = 0;
173 	std::string oldfile = m_logfname + ".0";
174 	::unlink (oldfile.c_str ());
175 	::rename (m_logfname.c_str (), oldfile.c_str ());
176 
177 	m_sink.open (m_logfname.c_str (), std::ios::out | std::ios::app);
178 	if (!m_sink) {
179 		DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", m_logfname.c_str ()));
180 	}
181 }
182