1 /*-
2 * Copyright (c) 2011, 2012, 2013, 2014 Spectra Logic Corporation
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions, and the following disclaimer,
10 * without modification.
11 * 2. Redistributions in binary form must reproduce at minimum a disclaimer
12 * substantially similar to the "NO WARRANTY" disclaimer below
13 * ("Disclaimer") and any redistribution must be conditioned upon
14 * including a substantially similar Disclaimer requirement for further
15 * binary redistribution.
16 *
17 * NO WARRANTY
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
27 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGES.
29 *
30 * Authors: Justin T. Gibbs (Spectra Logic Corporation)
31 */
32
33 /**
34 * \file consumer.cc
35 */
36
37 #include <sys/cdefs.h>
38 #include <sys/poll.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41
42 #include <err.h>
43 #include <errno.h>
44 #include <fcntl.h>
45 #include <syslog.h>
46 #include <unistd.h>
47
48 #include <cstdarg>
49 #include <cstring>
50 #include <list>
51 #include <map>
52 #include <string>
53
54 #include "guid.h"
55 #include "event.h"
56 #include "event_factory.h"
57 #include "exception.h"
58
59 #include "consumer.h"
60 /*================================== Macros ==================================*/
61 #define NUM_ELEMENTS(x) (sizeof(x) / sizeof(*x))
62
63 /*============================ Namespace Control =============================*/
64 using std::string;
65 namespace DevdCtl
66 {
67
68 /*============================= Class Definitions ============================*/
69 /*----------------------------- DevdCtl::Consumer ----------------------------*/
70 //- Consumer Static Private Data -----------------------------------------------
71 const char Consumer::s_devdSockPath[] = "/var/run/devd.seqpacket.pipe";
72
73 //- Consumer Public Methods ----------------------------------------------------
Consumer(Event::BuildMethod * defBuilder,EventFactory::Record * regEntries,size_t numEntries)74 Consumer::Consumer(Event::BuildMethod *defBuilder,
75 EventFactory::Record *regEntries,
76 size_t numEntries)
77 : m_devdSockFD(-1),
78 m_eventFactory(defBuilder),
79 m_replayingEvents(false)
80 {
81 m_eventFactory.UpdateRegistry(regEntries, numEntries);
82 }
83
~Consumer()84 Consumer::~Consumer()
85 {
86 DisconnectFromDevd();
87 }
88
89 bool
ConnectToDevd()90 Consumer::ConnectToDevd()
91 {
92 struct sockaddr_un devdAddr;
93 int sLen;
94 int result;
95
96 if (m_devdSockFD != -1) {
97 /* Already connected. */
98 syslog(LOG_DEBUG, "%s: Already connected.", __func__);
99 return (true);
100 }
101 syslog(LOG_INFO, "%s: Connecting to devd.", __func__);
102
103 memset(&devdAddr, 0, sizeof(devdAddr));
104 devdAddr.sun_family= AF_UNIX;
105 strlcpy(devdAddr.sun_path, s_devdSockPath, sizeof(devdAddr.sun_path));
106 sLen = SUN_LEN(&devdAddr);
107
108 m_devdSockFD = socket(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0);
109 if (m_devdSockFD == -1)
110 err(1, "Unable to create socket");
111 result = connect(m_devdSockFD,
112 reinterpret_cast<sockaddr *>(&devdAddr),
113 sLen);
114 if (result == -1) {
115 syslog(LOG_INFO, "Unable to connect to devd");
116 DisconnectFromDevd();
117 return (false);
118 }
119
120 syslog(LOG_INFO, "Connection to devd successful");
121 return (true);
122 }
123
124 void
DisconnectFromDevd()125 Consumer::DisconnectFromDevd()
126 {
127 if (m_devdSockFD != -1) {
128 syslog(LOG_INFO, "Disconnecting from devd.");
129 close(m_devdSockFD);
130 }
131 m_devdSockFD = -1;
132 }
133
134 std::string
ReadEvent()135 Consumer::ReadEvent()
136 {
137 char buf[MAX_EVENT_SIZE + 1];
138 ssize_t len;
139
140 len = ::recv(m_devdSockFD, buf, MAX_EVENT_SIZE, MSG_WAITALL);
141 if (len == -1)
142 return (std::string(""));
143 else {
144 /* NULL-terminate the result */
145 buf[len] = '\0';
146 return (std::string(buf));
147 }
148 }
149
150 void
ReplayUnconsumedEvents(bool discardUnconsumed)151 Consumer::ReplayUnconsumedEvents(bool discardUnconsumed)
152 {
153 EventList::iterator event(m_unconsumedEvents.begin());
154 bool replayed_any = (event != m_unconsumedEvents.end());
155
156 m_replayingEvents = true;
157 if (replayed_any)
158 syslog(LOG_INFO, "Started replaying unconsumed events");
159 while (event != m_unconsumedEvents.end()) {
160 bool consumed((*event)->Process());
161 if (consumed || discardUnconsumed) {
162 delete *event;
163 event = m_unconsumedEvents.erase(event);
164 } else {
165 event++;
166 }
167 }
168 if (replayed_any)
169 syslog(LOG_INFO, "Finished replaying unconsumed events");
170 m_replayingEvents = false;
171 }
172
173 bool
SaveEvent(const Event & event)174 Consumer::SaveEvent(const Event &event)
175 {
176 if (m_replayingEvents)
177 return (false);
178 m_unconsumedEvents.push_back(event.DeepCopy());
179 return (true);
180 }
181
182 Event *
NextEvent()183 Consumer::NextEvent()
184 {
185 if (!Connected())
186 return(NULL);
187
188 Event *event(NULL);
189 try {
190 string evString;
191
192 evString = ReadEvent();
193 if (! evString.empty()) {
194 Event::TimestampEventString(evString);
195 event = Event::CreateEvent(m_eventFactory, evString);
196 }
197 } catch (const Exception &exp) {
198 exp.Log();
199 DisconnectFromDevd();
200 }
201 return (event);
202 }
203
204 /* Capture and process buffered events. */
205 void
ProcessEvents()206 Consumer::ProcessEvents()
207 {
208 Event *event;
209 while ((event = NextEvent()) != NULL) {
210 if (event->Process())
211 SaveEvent(*event);
212 delete event;
213 }
214 }
215
216 void
FlushEvents()217 Consumer::FlushEvents()
218 {
219 std::string s;
220
221 do
222 s = ReadEvent();
223 while (! s.empty()) ;
224 }
225
226 bool
EventsPending()227 Consumer::EventsPending()
228 {
229 struct pollfd fds[1];
230 int result;
231
232 do {
233 fds->fd = m_devdSockFD;
234 fds->events = POLLIN;
235 fds->revents = 0;
236 result = poll(fds, NUM_ELEMENTS(fds), /*timeout*/0);
237 } while (result == -1 && errno == EINTR);
238
239 if (result == -1)
240 err(1, "Polling for devd events failed");
241
242 if ((fds->revents & POLLERR) != 0)
243 throw Exception("Consumer::EventsPending(): "
244 "POLLERR detected on devd socket.");
245
246 if ((fds->revents & POLLHUP) != 0)
247 throw Exception("Consumer::EventsPending(): "
248 "POLLHUP detected on devd socket.");
249
250 return ((fds->revents & POLLIN) != 0);
251 }
252
253 } // namespace DevdCtl
254