1// Copyright 2015 RedHat, Inc. 2// Copyright 2015 CoreOS, Inc. 3// 4// Licensed under the Apache License, Version 2.0 (the "License"); 5// you may not use this file except in compliance with the License. 6// You may obtain a copy of the License at 7// 8// http://www.apache.org/licenses/LICENSE-2.0 9// 10// Unless required by applicable law or agreed to in writing, software 11// distributed under the License is distributed on an "AS IS" BASIS, 12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13// See the License for the specific language governing permissions and 14// limitations under the License. 15 16package sdjournal 17 18import ( 19 "errors" 20 "fmt" 21 "io" 22 "log" 23 "strings" 24 "sync" 25 "time" 26) 27 28var ( 29 // ErrExpired gets returned when the Follow function runs into the 30 // specified timeout. 31 ErrExpired = errors.New("Timeout expired") 32) 33 34// JournalReaderConfig represents options to drive the behavior of a JournalReader. 35type JournalReaderConfig struct { 36 // The Since, NumFromTail and Cursor options are mutually exclusive and 37 // determine where the reading begins within the journal. The order in which 38 // options are written is exactly the order of precedence. 39 Since time.Duration // start relative to a Duration from now 40 NumFromTail uint64 // start relative to the tail 41 Cursor string // start relative to the cursor 42 43 // Show only journal entries whose fields match the supplied values. If 44 // the array is empty, entries will not be filtered. 45 Matches []Match 46 47 // If not empty, the journal instance will point to a journal residing 48 // in this directory. The supplied path may be relative or absolute. 49 Path string 50 51 // If not nil, Formatter will be used to translate the resulting entries 52 // into strings. If not set, the default format (timestamp and message field) 53 // will be used. If Formatter returns an error, Read will stop and return the error. 54 Formatter func(entry *JournalEntry) (string, error) 55} 56 57// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the 58// systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines. 59type JournalReader struct { 60 journal *Journal 61 msgReader *strings.Reader 62 formatter func(entry *JournalEntry) (string, error) 63} 64 65// NewJournalReader creates a new JournalReader with configuration options that are similar to the 66// systemd journalctl tool's iteration and filtering features. 67func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { 68 // use simpleMessageFormatter as default formatter. 69 if config.Formatter == nil { 70 config.Formatter = simpleMessageFormatter 71 } 72 73 r := &JournalReader{ 74 formatter: config.Formatter, 75 } 76 77 // Open the journal 78 var err error 79 if config.Path != "" { 80 r.journal, err = NewJournalFromDir(config.Path) 81 } else { 82 r.journal, err = NewJournal() 83 } 84 if err != nil { 85 return nil, err 86 } 87 88 // Add any supplied matches 89 for _, m := range config.Matches { 90 if err = r.journal.AddMatch(m.String()); err != nil { 91 return nil, err 92 } 93 } 94 95 // Set the start position based on options 96 if config.Since != 0 { 97 // Start based on a relative time 98 start := time.Now().Add(config.Since) 99 if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil { 100 return nil, err 101 } 102 } else if config.NumFromTail != 0 { 103 // Start based on a number of lines before the tail 104 if err := r.journal.SeekTail(); err != nil { 105 return nil, err 106 } 107 108 // Move the read pointer into position near the tail. Go one further than 109 // the option so that the initial cursor advancement positions us at the 110 // correct starting point. 111 skip, err := r.journal.PreviousSkip(config.NumFromTail + 1) 112 if err != nil { 113 return nil, err 114 } 115 // If we skipped fewer lines than expected, we have reached journal start. 116 // Thus, we seek to head so that next invocation can read the first line. 117 if skip != config.NumFromTail+1 { 118 if err := r.journal.SeekHead(); err != nil { 119 return nil, err 120 } 121 } 122 } else if config.Cursor != "" { 123 // Start based on a custom cursor 124 if err := r.journal.SeekCursor(config.Cursor); err != nil { 125 return nil, err 126 } 127 } 128 129 return r, nil 130} 131 132// Read reads entries from the journal. Read follows the Reader interface so 133// it must be able to read a specific amount of bytes. Journald on the other 134// hand only allows us to read full entries of arbitrary size (without byte 135// granularity). JournalReader is therefore internally buffering entries that 136// don't fit in the read buffer. Callers should keep calling until 0 and/or an 137// error is returned. 138func (r *JournalReader) Read(b []byte) (int, error) { 139 if r.msgReader == nil { 140 // Advance the journal cursor. It has to be called at least one time 141 // before reading 142 c, err := r.journal.Next() 143 144 // An unexpected error 145 if err != nil { 146 return 0, err 147 } 148 149 // EOF detection 150 if c == 0 { 151 return 0, io.EOF 152 } 153 154 entry, err := r.journal.GetEntry() 155 if err != nil { 156 return 0, err 157 } 158 159 // Build a message 160 msg, err := r.formatter(entry) 161 if err != nil { 162 return 0, err 163 } 164 r.msgReader = strings.NewReader(msg) 165 } 166 167 // Copy and return the message 168 sz, err := r.msgReader.Read(b) 169 if err == io.EOF { 170 // The current entry has been fully read. Don't propagate this 171 // EOF, so the next entry can be read at the next Read() 172 // iteration. 173 r.msgReader = nil 174 return sz, nil 175 } 176 if err != nil { 177 return sz, err 178 } 179 if r.msgReader.Len() == 0 { 180 r.msgReader = nil 181 } 182 183 return sz, nil 184} 185 186// Close closes the JournalReader's handle to the journal. 187func (r *JournalReader) Close() error { 188 return r.journal.Close() 189} 190 191// Rewind attempts to rewind the JournalReader to the first entry. 192func (r *JournalReader) Rewind() error { 193 r.msgReader = nil 194 return r.journal.SeekHead() 195} 196 197// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The 198// follow will continue until a single time.Time is received on the until channel. 199func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error { 200 201 // Process journal entries and events. Entries are flushed until the tail or 202 // timeout is reached, and then we wait for new events or the timeout. 203 var msg = make([]byte, 64*1<<(10)) 204 var waitCh = make(chan int, 1) 205 var waitGroup sync.WaitGroup 206 defer waitGroup.Wait() 207 208process: 209 for { 210 c, err := r.Read(msg) 211 if err != nil && err != io.EOF { 212 return err 213 } 214 215 select { 216 case <-until: 217 return ErrExpired 218 default: 219 } 220 if c > 0 { 221 if _, err = writer.Write(msg[:c]); err != nil { 222 return err 223 } 224 continue process 225 } 226 227 // We're at the tail, so wait for new events or time out. 228 // Holds journal events to process. Tightly bounded for now unless there's a 229 // reason to unblock the journal watch routine more quickly. 230 for { 231 waitGroup.Add(1) 232 go func() { 233 status := r.journal.Wait(100 * time.Millisecond) 234 waitCh <- status 235 waitGroup.Done() 236 }() 237 238 select { 239 case <-until: 240 return ErrExpired 241 case e := <-waitCh: 242 switch e { 243 case SD_JOURNAL_NOP: 244 // the journal did not change since the last invocation 245 case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: 246 continue process 247 default: 248 if e < 0 { 249 return fmt.Errorf("received error event: %d", e) 250 } 251 252 log.Printf("received unknown event: %d\n", e) 253 } 254 } 255 } 256 } 257} 258 259// simpleMessageFormatter is the default formatter. 260// It returns a string representing the current journal entry in a simple format which 261// includes the entry timestamp and MESSAGE field. 262func simpleMessageFormatter(entry *JournalEntry) (string, error) { 263 msg, ok := entry.Fields["MESSAGE"] 264 if !ok { 265 return "", fmt.Errorf("no MESSAGE field present in journal entry") 266 } 267 268 usec := entry.RealtimeTimestamp 269 timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond)) 270 271 return fmt.Sprintf("%s %s\n", timestamp, msg), nil 272} 273