1package mail 2 3import ( 4 "bufio" 5 "bytes" 6 "crypto/md5" 7 "errors" 8 "fmt" 9 "io" 10 "mime" 11 "net/mail" 12 "net/textproto" 13 "strings" 14 "sync" 15 "time" 16) 17 18// A WordDecoder decodes MIME headers containing RFC 2047 encoded-words. 19// Used by the MimeHeaderDecode function. 20// It's exposed public so that an alternative decoder can be set, eg Gnu iconv 21// by importing the mail/inconv package. 22// Another alternative would be to use https://godoc.org/golang.org/x/text/encoding 23var Dec mime.WordDecoder 24 25func init() { 26 // use the default decoder, without Gnu inconv. Import the mail/inconv package to use iconv. 27 Dec = mime.WordDecoder{} 28} 29 30const maxHeaderChunk = 1 + (4 << 10) // 4KB 31 32// Address encodes an email address of the form `<user@host>` 33type Address struct { 34 // User is local part 35 User string 36 // Host is the domain 37 Host string 38 // ADL is at-domain list if matched 39 ADL []string 40 // PathParams contains any ESTMP parameters that were matched 41 PathParams [][]string 42 // NullPath is true if <> was received 43 NullPath bool 44} 45 46func (ep *Address) String() string { 47 return fmt.Sprintf("%s@%s", ep.User, ep.Host) 48} 49 50func (ep *Address) IsEmpty() bool { 51 return ep.User == "" && ep.Host == "" 52} 53 54var ap = mail.AddressParser{} 55 56// NewAddress takes a string of an RFC 5322 address of the 57// form "Gogh Fir <gf@example.com>" or "foo@example.com". 58func NewAddress(str string) (Address, error) { 59 a, err := ap.Parse(str) 60 if err != nil { 61 return Address{}, err 62 } 63 pos := strings.Index(a.Address, "@") 64 if pos > 0 { 65 return Address{ 66 User: a.Address[0:pos], 67 Host: a.Address[pos+1:], 68 }, 69 nil 70 } 71 return Address{}, errors.New("invalid address") 72} 73 74// Email represents a single SMTP message. 75type Envelope struct { 76 // Remote IP address 77 RemoteIP string 78 // Message sent in EHLO command 79 Helo string 80 // Sender 81 MailFrom Address 82 // Recipients 83 RcptTo []Address 84 // Data stores the header and message body 85 Data bytes.Buffer 86 // Subject stores the subject of the email, extracted and decoded after calling ParseHeaders() 87 Subject string 88 // TLS is true if the email was received using a TLS connection 89 TLS bool 90 // Header stores the results from ParseHeaders() 91 Header textproto.MIMEHeader 92 // Values hold the values generated when processing the envelope by the backend 93 Values map[string]interface{} 94 // Hashes of each email on the rcpt 95 Hashes []string 96 // additional delivery header that may be added 97 DeliveryHeader string 98 // Email(s) will be queued with this id 99 QueuedId string 100 // When locked, it means that the envelope is being processed by the backend 101 sync.Mutex 102} 103 104func NewEnvelope(remoteAddr string, clientID uint64) *Envelope { 105 return &Envelope{ 106 RemoteIP: remoteAddr, 107 Values: make(map[string]interface{}), 108 QueuedId: queuedID(clientID), 109 } 110} 111 112func queuedID(clientID uint64) string { 113 return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID)))) 114} 115 116// ParseHeaders parses the headers into Header field of the Envelope struct. 117// Data buffer must be full before calling. 118// It assumes that at most 30kb of email data can be a header 119// Decoding of encoding to UTF is only done on the Subject, where the result is assigned to the Subject field 120func (e *Envelope) ParseHeaders() error { 121 var err error 122 if e.Header != nil { 123 return errors.New("headers already parsed") 124 } 125 buf := e.Data.Bytes() 126 // find where the header ends, assuming that over 30 kb would be max 127 if len(buf) > maxHeaderChunk { 128 buf = buf[:maxHeaderChunk] 129 } 130 131 headerEnd := bytes.Index(buf, []byte{'\n', '\n'}) // the first two new-lines chars are the End Of Header 132 if headerEnd > -1 { 133 header := buf[0 : headerEnd+2] 134 headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header))) 135 e.Header, err = headerReader.ReadMIMEHeader() 136 if err == nil || err == io.EOF { 137 // decode the subject 138 if subject, ok := e.Header["Subject"]; ok { 139 e.Subject = MimeHeaderDecode(subject[0]) 140 } 141 } 142 } else { 143 err = errors.New("header not found") 144 } 145 return err 146} 147 148// Len returns the number of bytes that would be in the reader returned by NewReader() 149func (e *Envelope) Len() int { 150 return len(e.DeliveryHeader) + e.Data.Len() 151} 152 153// Returns a new reader for reading the email contents, including the delivery headers 154func (e *Envelope) NewReader() io.Reader { 155 return io.MultiReader( 156 strings.NewReader(e.DeliveryHeader), 157 bytes.NewReader(e.Data.Bytes()), 158 ) 159} 160 161// String converts the email to string. 162// Typically, you would want to use the compressor guerrilla.Processor for more efficiency, or use NewReader 163func (e *Envelope) String() string { 164 return e.DeliveryHeader + e.Data.String() 165} 166 167// ResetTransaction is called when the transaction is reset (keeping the connection open) 168func (e *Envelope) ResetTransaction() { 169 170 // ensure not processing by the backend, will only get lock if finished, otherwise block 171 e.Lock() 172 // got the lock, it means processing finished 173 e.Unlock() 174 175 e.MailFrom = Address{} 176 e.RcptTo = []Address{} 177 // reset the data buffer, keep it allocated 178 e.Data.Reset() 179 180 // todo: these are probably good candidates for buffers / use sync.Pool (after profiling) 181 e.Subject = "" 182 e.Header = nil 183 e.Hashes = make([]string, 0) 184 e.DeliveryHeader = "" 185 e.Values = make(map[string]interface{}) 186} 187 188// Seed is called when used with a new connection, once it's accepted 189func (e *Envelope) Reseed(RemoteIP string, clientID uint64) { 190 e.RemoteIP = RemoteIP 191 e.QueuedId = queuedID(clientID) 192 e.Helo = "" 193 e.TLS = false 194} 195 196// PushRcpt adds a recipient email address to the envelope 197func (e *Envelope) PushRcpt(addr Address) { 198 e.RcptTo = append(e.RcptTo, addr) 199} 200 201// Pop removes the last email address that was pushed to the envelope 202func (e *Envelope) PopRcpt() Address { 203 ret := e.RcptTo[len(e.RcptTo)-1] 204 e.RcptTo = e.RcptTo[:len(e.RcptTo)-1] 205 return ret 206} 207 208// Converts 7 bit encoded mime header strings to UTF-8 209func MimeHeaderDecode(str string) string { 210 state := 0 211 var buf bytes.Buffer 212 var out []byte 213 for i := 0; i < len(str); i++ { 214 switch state { 215 case 0: 216 if str[i] == '=' { 217 buf.WriteByte(str[i]) 218 state = 1 219 } else { 220 out = append(out, str[i]) 221 } 222 case 1: 223 if str[i] == '?' { 224 buf.WriteByte(str[i]) 225 state = 2 226 } else { 227 out = append(out, str[i]) 228 buf.Reset() 229 state = 0 230 } 231 232 case 2: 233 if str[i] == ' ' { 234 d, err := Dec.Decode(buf.String()) 235 if err == nil { 236 out = append(out, []byte(d)...) 237 } else { 238 out = append(out, buf.Bytes()...) 239 } 240 out = append(out, ' ') 241 buf.Reset() 242 state = 0 243 } else { 244 buf.WriteByte(str[i]) 245 } 246 } 247 } 248 if buf.Len() > 0 { 249 d, err := Dec.Decode(buf.String()) 250 if err == nil { 251 out = append(out, []byte(d)...) 252 } else { 253 out = append(out, buf.Bytes()...) 254 } 255 } 256 return string(out) 257} 258 259// Envelopes have their own pool 260 261type Pool struct { 262 // envelopes that are ready to be borrowed 263 pool chan *Envelope 264 // semaphore to control number of maximum borrowed envelopes 265 sem chan bool 266} 267 268func NewPool(poolSize int) *Pool { 269 return &Pool{ 270 pool: make(chan *Envelope, poolSize), 271 sem: make(chan bool, poolSize), 272 } 273} 274 275func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope { 276 var e *Envelope 277 p.sem <- true // block the envelope until more room 278 select { 279 case e = <-p.pool: 280 e.Reseed(remoteAddr, clientID) 281 default: 282 e = NewEnvelope(remoteAddr, clientID) 283 } 284 return e 285} 286 287// Return returns an envelope back to the envelope pool 288// Make sure that envelope finished processing before calling this 289func (p *Pool) Return(e *Envelope) { 290 select { 291 case p.pool <- e: 292 //placed envelope back in pool 293 default: 294 // pool is full, discard it 295 } 296 // take a value off the semaphore to make room for more envelopes 297 <-p.sem 298} 299