1// Copyright 2019 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package fs
16
17import (
18	"fmt"
19	"io/ioutil"
20	"os"
21	"path/filepath"
22	"sync"
23	"syscall"
24
25	"istio.io/pkg/appsignals"
26
27	"istio.io/istio/galley/pkg/config/scope"
28	"istio.io/istio/galley/pkg/config/source/kube/inmemory"
29	"istio.io/istio/pkg/config/event"
30	"istio.io/istio/pkg/config/schema/collection"
31)
32
33var (
34	supportedExtensions = map[string]bool{
35		".yaml": true,
36		".yml":  true,
37	}
38)
39
40var nameDiscriminator int64
41
42type source struct {
43	mu               sync.Mutex
44	name             string
45	s                *inmemory.KubeSource
46	root             string
47	done             chan struct{}
48	watchConfigFiles bool
49}
50
51var _ event.Source = &source{}
52
53// New returns a new filesystem based processor.Source.
54func New(root string, schemas collection.Schemas, watchConfigFiles bool) (event.Source, error) {
55	src := inmemory.NewKubeSource(schemas)
56	name := fmt.Sprintf("fs-%d", nameDiscriminator)
57	nameDiscriminator++
58
59	s := &source{
60		name:             name,
61		root:             root,
62		s:                src,
63		watchConfigFiles: watchConfigFiles,
64	}
65
66	return s, nil
67}
68
69// Start implements processor.Source
70func (s *source) Start() {
71	s.mu.Lock()
72	defer s.mu.Unlock()
73
74	if s.done != nil {
75		return
76	}
77	done := make(chan struct{})
78	s.done = done
79
80	c := make(chan appsignals.Signal, 1)
81	appsignals.Watch(c)
82	shut := make(chan os.Signal, 1)
83	if s.watchConfigFiles {
84		if err := appsignals.FileTrigger(s.root, syscall.SIGUSR1, shut); err != nil {
85			scope.Source.Errorf("Unable to setup FileTrigger for %s: %v", s.root, err)
86		}
87	}
88	go func() {
89		s.reload()
90		s.s.Start()
91		for {
92			select {
93			case trigger := <-c:
94				if trigger.Signal == syscall.SIGUSR1 {
95					scope.Source.Infof("[%s] Triggering reload in response to: %v", s.name, trigger.Source)
96					s.reload()
97				}
98			case <-done:
99				if s.watchConfigFiles {
100					shut <- syscall.SIGTERM
101				}
102				return
103			}
104		}
105	}()
106}
107
108// Stop implements processor.Source.
109func (s *source) Stop() {
110	scope.Source.Debugf("fs.Source.Stop >>>")
111	defer scope.Source.Debugf("fs.Source.Stop <<<")
112	s.mu.Lock()
113	defer s.mu.Unlock()
114	if s.done == nil {
115		return
116	}
117	close(s.done)
118	s.s.Stop()
119	s.s.Clear()
120	s.done = nil
121}
122
123// Dispatch implements event.Source
124func (s *source) Dispatch(h event.Handler) {
125	s.s.Dispatch(h)
126}
127
128func (s *source) reload() {
129	s.mu.Lock()
130	defer s.mu.Unlock()
131
132	scope.Source.Debugf("[%s] Begin reloading files...", s.name)
133	names := s.s.ContentNames()
134
135	err := filepath.Walk(s.root, func(path string, info os.FileInfo, err error) error {
136		if err != nil {
137			return err
138		}
139
140		if mode := info.Mode() & os.ModeType; !supportedExtensions[filepath.Ext(path)] || (mode != 0 && mode != os.ModeSymlink) {
141			return nil
142		}
143
144		scope.Source.Infof("[%s] Discovered file: %q", s.name, path)
145
146		data, err := ioutil.ReadFile(path)
147		if err != nil {
148			scope.Source.Infof("[%s] Error reading file %q: %v", s.name, path, err)
149			return err
150		}
151
152		if err := s.s.ApplyContent(path, string(data)); err != nil {
153			scope.Source.Errorf("[%s] Error applying file contents(%q): %v", s.name, path, err)
154		}
155		delete(names, path)
156		return nil
157	})
158
159	if err != nil {
160		scope.Source.Errorf("Error walking path during reload: %v", err)
161		return
162	}
163
164	for n := range names {
165		scope.Source.Infof("Removing the contents of the file %q", n)
166
167		s.s.RemoveContent(n)
168	}
169
170	scope.Source.Debugf("[%s] Completed reloading files...", s.name)
171}
172