1// Copyright 2015 RedHat, Inc.
2// Copyright 2015 CoreOS, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package sdjournal
17
18import (
19	"errors"
20	"fmt"
21	"io"
22	"log"
23	"strings"
24	"sync"
25	"time"
26)
27
28var (
29	// ErrExpired gets returned when the Follow function runs into the
30	// specified timeout.
31	ErrExpired = errors.New("Timeout expired")
32)
33
34// JournalReaderConfig represents options to drive the behavior of a JournalReader.
35type JournalReaderConfig struct {
36	// The Since, NumFromTail and Cursor options are mutually exclusive and
37	// determine where the reading begins within the journal. The order in which
38	// options are written is exactly the order of precedence.
39	Since       time.Duration // start relative to a Duration from now
40	NumFromTail uint64        // start relative to the tail
41	Cursor      string        // start relative to the cursor
42
43	// Show only journal entries whose fields match the supplied values. If
44	// the array is empty, entries will not be filtered.
45	Matches []Match
46
47	// If not empty, the journal instance will point to a journal residing
48	// in this directory. The supplied path may be relative or absolute.
49	Path string
50
51	// If not nil, Formatter will be used to translate the resulting entries
52	// into strings. If not set, the default format (timestamp and message field)
53	// will be used. If Formatter returns an error, Read will stop and return the error.
54	Formatter func(entry *JournalEntry) (string, error)
55}
56
57// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
58// systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines.
59type JournalReader struct {
60	journal   *Journal
61	msgReader *strings.Reader
62	formatter func(entry *JournalEntry) (string, error)
63}
64
65// NewJournalReader creates a new JournalReader with configuration options that are similar to the
66// systemd journalctl tool's iteration and filtering features.
67func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
68	// use simpleMessageFormatter as default formatter.
69	if config.Formatter == nil {
70		config.Formatter = simpleMessageFormatter
71	}
72
73	r := &JournalReader{
74		formatter: config.Formatter,
75	}
76
77	// Open the journal
78	var err error
79	if config.Path != "" {
80		r.journal, err = NewJournalFromDir(config.Path)
81	} else {
82		r.journal, err = NewJournal()
83	}
84	if err != nil {
85		return nil, err
86	}
87
88	// Add any supplied matches
89	for _, m := range config.Matches {
90		if err = r.journal.AddMatch(m.String()); err != nil {
91			return nil, err
92		}
93	}
94
95	// Set the start position based on options
96	if config.Since != 0 {
97		// Start based on a relative time
98		start := time.Now().Add(config.Since)
99		if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
100			return nil, err
101		}
102	} else if config.NumFromTail != 0 {
103		// Start based on a number of lines before the tail
104		if err := r.journal.SeekTail(); err != nil {
105			return nil, err
106		}
107
108		// Move the read pointer into position near the tail. Go one further than
109		// the option so that the initial cursor advancement positions us at the
110		// correct starting point.
111		skip, err := r.journal.PreviousSkip(config.NumFromTail + 1)
112		if err != nil {
113			return nil, err
114		}
115		// If we skipped fewer lines than expected, we have reached journal start.
116		// Thus, we seek to head so that next invocation can read the first line.
117		if skip != config.NumFromTail+1 {
118			if err := r.journal.SeekHead(); err != nil {
119				return nil, err
120			}
121		}
122	} else if config.Cursor != "" {
123		// Start based on a custom cursor
124		if err := r.journal.SeekCursor(config.Cursor); err != nil {
125			return nil, err
126		}
127	}
128
129	return r, nil
130}
131
132// Read reads entries from the journal. Read follows the Reader interface so
133// it must be able to read a specific amount of bytes. Journald on the other
134// hand only allows us to read full entries of arbitrary size (without byte
135// granularity). JournalReader is therefore internally buffering entries that
136// don't fit in the read buffer. Callers should keep calling until 0 and/or an
137// error is returned.
138func (r *JournalReader) Read(b []byte) (int, error) {
139	if r.msgReader == nil {
140		// Advance the journal cursor. It has to be called at least one time
141		// before reading
142		c, err := r.journal.Next()
143
144		// An unexpected error
145		if err != nil {
146			return 0, err
147		}
148
149		// EOF detection
150		if c == 0 {
151			return 0, io.EOF
152		}
153
154		entry, err := r.journal.GetEntry()
155		if err != nil {
156			return 0, err
157		}
158
159		// Build a message
160		msg, err := r.formatter(entry)
161		if err != nil {
162			return 0, err
163		}
164		r.msgReader = strings.NewReader(msg)
165	}
166
167	// Copy and return the message
168	sz, err := r.msgReader.Read(b)
169	if err == io.EOF {
170		// The current entry has been fully read. Don't propagate this
171		// EOF, so the next entry can be read at the next Read()
172		// iteration.
173		r.msgReader = nil
174		return sz, nil
175	}
176	if err != nil {
177		return sz, err
178	}
179	if r.msgReader.Len() == 0 {
180		r.msgReader = nil
181	}
182
183	return sz, nil
184}
185
186// Close closes the JournalReader's handle to the journal.
187func (r *JournalReader) Close() error {
188	return r.journal.Close()
189}
190
191// Rewind attempts to rewind the JournalReader to the first entry.
192func (r *JournalReader) Rewind() error {
193	r.msgReader = nil
194	return r.journal.SeekHead()
195}
196
197// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
198// follow will continue until a single time.Time is received on the until channel.
199func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
200
201	// Process journal entries and events. Entries are flushed until the tail or
202	// timeout is reached, and then we wait for new events or the timeout.
203	var msg = make([]byte, 64*1<<(10))
204	var waitCh = make(chan int, 1)
205	var waitGroup sync.WaitGroup
206	defer waitGroup.Wait()
207
208process:
209	for {
210		c, err := r.Read(msg)
211		if err != nil && err != io.EOF {
212			return err
213		}
214
215		select {
216		case <-until:
217			return ErrExpired
218		default:
219		}
220		if c > 0 {
221			if _, err = writer.Write(msg[:c]); err != nil {
222				return err
223			}
224			continue process
225		}
226
227		// We're at the tail, so wait for new events or time out.
228		// Holds journal events to process. Tightly bounded for now unless there's a
229		// reason to unblock the journal watch routine more quickly.
230		for {
231			waitGroup.Add(1)
232			go func() {
233				status := r.journal.Wait(100 * time.Millisecond)
234				waitCh <- status
235				waitGroup.Done()
236			}()
237
238			select {
239			case <-until:
240				return ErrExpired
241			case e := <-waitCh:
242				switch e {
243				case SD_JOURNAL_NOP:
244					// the journal did not change since the last invocation
245				case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
246					continue process
247				default:
248					if e < 0 {
249						return fmt.Errorf("received error event: %d", e)
250					}
251
252					log.Printf("received unknown event: %d\n", e)
253				}
254			}
255		}
256	}
257}
258
259// simpleMessageFormatter is the default formatter.
260// It returns a string representing the current journal entry in a simple format which
261// includes the entry timestamp and MESSAGE field.
262func simpleMessageFormatter(entry *JournalEntry) (string, error) {
263	msg, ok := entry.Fields["MESSAGE"]
264	if !ok {
265		return "", fmt.Errorf("no MESSAGE field present in journal entry")
266	}
267
268	usec := entry.RealtimeTimestamp
269	timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
270
271	return fmt.Sprintf("%s %s\n", timestamp, msg), nil
272}
273