1// Copyright 2018 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package server
16
17import (
18	"bytes"
19	"crypto/tls"
20	"crypto/x509"
21	"encoding/json"
22	"errors"
23	"fmt"
24	"io/ioutil"
25	"net/http"
26	"sync"
27	"time"
28
29	"github.com/ghodss/yaml"
30	"github.com/hashicorp/go-multierror"
31	kubeApiAdmission "k8s.io/api/admission/v1beta1"
32	kubeApiApps "k8s.io/api/apps/v1beta1"
33	kubeApisMeta "k8s.io/apimachinery/pkg/apis/meta/v1"
34	"k8s.io/apimachinery/pkg/runtime"
35	"k8s.io/apimachinery/pkg/runtime/serializer"
36
37	"istio.io/pkg/filewatcher"
38	"istio.io/pkg/log"
39
40	"istio.io/istio/mixer/pkg/config/store"
41	"istio.io/istio/pilot/pkg/config/kube/crd"
42	"istio.io/istio/pkg/config/constants"
43	"istio.io/istio/pkg/config/schema/collection"
44	"istio.io/istio/pkg/config/schema/collections"
45	"istio.io/istio/pkg/config/schema/resource"
46)
47
48var scope = log.RegisterScope("validationServer", "validation webhook server", 0)
49
50var (
51	runtimeScheme = runtime.NewScheme()
52	codecs        = serializer.NewCodecFactory(runtimeScheme)
53	deserializer  = codecs.UniversalDeserializer()
54
55	// Expect AdmissionRequest to only include these top-level field names
56	validFields = map[string]bool{
57		"apiVersion": true,
58		"kind":       true,
59		"metadata":   true,
60		"spec":       true,
61		"status":     true,
62	}
63)
64
65func init() {
66	_ = kubeApiApps.AddToScheme(runtimeScheme)
67}
68
69const (
70	HTTPSHandlerReadyPath = "/httpsReady"
71
72	watchDebounceDelay = 100 * time.Millisecond
73)
74
75// Options contains the configuration for the Istio Pilot validation
76// admission controller.
77type Options struct {
78	// MixerValidator implements the backend validator functions for mixer configuration.
79	MixerValidator store.BackendValidator
80
81	// Schemas provides a description of all configuration resources excluding mixer types.
82	Schemas collection.Schemas
83
84	// DomainSuffix is the DNS domain suffix for Pilot CRD resources,
85	// e.g. cluster.local.
86	DomainSuffix string
87
88	// Port where the webhook is served. the number should be greater than 1024 for non-root
89	// user, because non-root user cannot bind port number less than 1024
90	Port uint
91
92	// CertFile is the path to the x509 certificate for https.
93	CertFile string
94
95	// KeyFile is the path to the x509 private key matching `CertFile`.
96	KeyFile string
97
98	// Use an existing mux instead of creating our own.
99	Mux *http.ServeMux
100}
101
102// String produces a stringified version of the arguments for debugging.
103func (o Options) String() string {
104	buf := &bytes.Buffer{}
105
106	_, _ = fmt.Fprintf(buf, "DomainSuffix: %s\n", o.DomainSuffix)
107	_, _ = fmt.Fprintf(buf, "Port: %d\n", o.Port)
108	_, _ = fmt.Fprintf(buf, "CertFile: %s\n", o.CertFile)
109	_, _ = fmt.Fprintf(buf, "KeyFile: %s\n", o.KeyFile)
110
111	return buf.String()
112}
113
114// DefaultArgs allocates an Options struct initialized with Webhook's default configuration.
115func DefaultArgs() Options {
116	return Options{
117		Port:     9443,
118		CertFile: constants.DefaultCertChain,
119		KeyFile:  constants.DefaultKey,
120	}
121}
122
123// Webhook implements the validating admission webhook for validating Istio configuration.
124type Webhook struct {
125	keyCertWatcher filewatcher.FileWatcher
126
127	mu   sync.RWMutex
128	cert *tls.Certificate
129
130	// pilot
131	schemas      collection.Schemas
132	domainSuffix string
133
134	// mixer
135	validator store.BackendValidator
136
137	server   *http.Server
138	keyFile  string
139	certFile string
140}
141
142// Reload the server's cert/key for TLS from file and save it for later use by the https server.
143func (wh *Webhook) reloadKeyCert() {
144	pair, err := ReloadCertkey(wh.certFile, wh.keyFile)
145	if err != nil {
146		return
147	}
148
149	wh.mu.Lock()
150	wh.cert = pair
151	wh.mu.Unlock()
152}
153
154// Reload the server's cert/key for TLS from file.
155func ReloadCertkey(certFile, keyFile string) (*tls.Certificate, error) {
156	pair, err := tls.LoadX509KeyPair(certFile, keyFile)
157	if err != nil {
158		reportValidationCertKeyUpdateError(err)
159		scope.Warnf("Cert/Key reload error: %v", err)
160		return nil, err
161	}
162
163	reportValidationCertKeyUpdate()
164	scope.Info("Cert and Key reloaded")
165
166	var row int
167	for _, cert := range pair.Certificate {
168		if x509Cert, err := x509.ParseCertificates(cert); err != nil {
169			scope.Infof("x509 cert [%v] - ParseCertificates() error: %v\n", row, err)
170			row++
171		} else {
172			for _, c := range x509Cert {
173				scope.Infof("x509 cert [%v] - Issuer: %q, Subject: %q, SN: %x, NotBefore: %q, NotAfter: %q\n",
174					row, c.Issuer, c.Subject, c.SerialNumber,
175					c.NotBefore.Format(time.RFC3339), c.NotAfter.Format(time.RFC3339))
176				row++
177			}
178		}
179	}
180	return &pair, nil
181}
182
183// New creates a new instance of the admission webhook server.
184func New(p Options) (*Webhook, error) {
185	if p.Mux != nil {
186		wh := &Webhook{
187			schemas:   p.Schemas,
188			validator: p.MixerValidator,
189		}
190
191		p.Mux.HandleFunc("/validate", wh.serveValidate)
192		// old handlers retained backwards compatibility during upgrades
193		p.Mux.HandleFunc("/admitpilot", wh.serveAdmitPilot)
194		p.Mux.HandleFunc("/admitmixer", wh.serveAdmitMixer)
195
196		return wh, nil
197	}
198	pair, err := ReloadCertkey(p.CertFile, p.KeyFile)
199	if err != nil {
200		return nil, err
201	}
202
203	// Configuration must be updated whenever the caBundle changes. Watch the parent directory of
204	// the target files so we can catch symlink updates of k8s secrets.
205	keyCertWatcher := filewatcher.NewWatcher()
206
207	for _, file := range []string{p.CertFile, p.KeyFile} {
208		if err := keyCertWatcher.Add(file); err != nil {
209			return nil, fmt.Errorf("could not watch %v: %v", file, err)
210		}
211	}
212
213	wh := &Webhook{
214		server: &http.Server{
215			Addr: fmt.Sprintf(":%v", p.Port),
216		},
217		keyFile:        p.KeyFile,
218		certFile:       p.CertFile,
219		keyCertWatcher: keyCertWatcher,
220		cert:           pair,
221		schemas:        p.Schemas,
222		validator:      p.MixerValidator,
223	}
224
225	// mtls disabled because apiserver webhook cert usage is still TBD.
226	wh.server.TLSConfig = &tls.Config{GetCertificate: wh.getCert}
227	h := http.NewServeMux()
228	h.HandleFunc(HTTPSHandlerReadyPath, wh.serveReady)
229	h.HandleFunc("/validate", wh.serveValidate)
230	// old handlers retained backwards compatibility during upgrades
231	h.HandleFunc("/admitpilot", wh.serveAdmitPilot)
232	h.HandleFunc("/admitmixer", wh.serveAdmitMixer)
233	wh.server.Handler = h
234
235	return wh, nil
236}
237
238//Stop the server
239func (wh *Webhook) Stop() {
240	_ = wh.server.Close()
241}
242
243var readyHook = func() {}
244
245// Run implements the webhook server
246func (wh *Webhook) Run(stopCh <-chan struct{}) {
247
248	if wh.server == nil {
249		// Externally managed
250		return
251	}
252	go func() {
253		if err := wh.server.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed {
254			scope.Fatalf("admission webhook ListenAndServeTLS failed: %v", err)
255		}
256	}()
257	defer func() {
258		wh.Stop()
259	}()
260
261	if readyHook != nil {
262		readyHook()
263	}
264
265	// use a timer to debounce key/cert updates
266	var keyCertTimerC <-chan time.Time
267
268	for {
269		select {
270		case <-keyCertTimerC:
271			keyCertTimerC = nil
272			wh.reloadKeyCert()
273		case <-wh.keyCertWatcher.Events(wh.keyFile):
274			if keyCertTimerC == nil {
275				keyCertTimerC = time.After(watchDebounceDelay)
276			}
277		case <-wh.keyCertWatcher.Events(wh.certFile):
278			if keyCertTimerC == nil {
279				keyCertTimerC = time.After(watchDebounceDelay)
280			}
281		case err := <-wh.keyCertWatcher.Errors(wh.keyFile):
282			scope.Errorf("configWatcher error: %v", err)
283		case err := <-wh.keyCertWatcher.Errors(wh.certFile):
284			scope.Errorf("configWatcher error: %v", err)
285		case <-stopCh:
286			return
287		}
288	}
289}
290
291func (wh *Webhook) getCert(*tls.ClientHelloInfo) (*tls.Certificate, error) {
292	wh.mu.Lock()
293	defer wh.mu.Unlock()
294	return wh.cert, nil
295}
296
297func toAdmissionResponse(err error) *kubeApiAdmission.AdmissionResponse {
298	return &kubeApiAdmission.AdmissionResponse{Result: &kubeApisMeta.Status{Message: err.Error()}}
299}
300
301type admitFunc func(*kubeApiAdmission.AdmissionRequest) *kubeApiAdmission.AdmissionResponse
302
303func serve(w http.ResponseWriter, r *http.Request, admit admitFunc) {
304	var body []byte
305	if r.Body != nil {
306		if data, err := ioutil.ReadAll(r.Body); err == nil {
307			body = data
308		}
309	}
310	if len(body) == 0 {
311		reportValidationHTTPError(http.StatusBadRequest)
312		http.Error(w, "no body found", http.StatusBadRequest)
313		return
314	}
315
316	// verify the content type is accurate
317	contentType := r.Header.Get("Content-Type")
318	if contentType != "application/json" {
319		reportValidationHTTPError(http.StatusUnsupportedMediaType)
320		http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType)
321		return
322	}
323
324	var reviewResponse *kubeApiAdmission.AdmissionResponse
325	ar := kubeApiAdmission.AdmissionReview{}
326	if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
327		reviewResponse = toAdmissionResponse(fmt.Errorf("could not decode body: %v", err))
328	} else {
329		reviewResponse = admit(ar.Request)
330	}
331
332	response := kubeApiAdmission.AdmissionReview{}
333	if reviewResponse != nil {
334		response.Response = reviewResponse
335		if ar.Request != nil {
336			response.Response.UID = ar.Request.UID
337		}
338	}
339
340	resp, err := json.Marshal(response)
341	if err != nil {
342		reportValidationHTTPError(http.StatusInternalServerError)
343		http.Error(w, fmt.Sprintf("could encode response: %v", err), http.StatusInternalServerError)
344		return
345	}
346	if _, err := w.Write(resp); err != nil {
347		reportValidationHTTPError(http.StatusInternalServerError)
348		http.Error(w, fmt.Sprintf("could write response: %v", err), http.StatusInternalServerError)
349	}
350}
351
352func (wh *Webhook) serveReady(w http.ResponseWriter, _ *http.Request) {
353	w.WriteHeader(http.StatusOK)
354}
355
356func (wh *Webhook) serveAdmitPilot(w http.ResponseWriter, r *http.Request) {
357	serve(w, r, wh.admitPilot)
358}
359
360func (wh *Webhook) serveAdmitMixer(w http.ResponseWriter, r *http.Request) {
361	serve(w, r, wh.admitMixer)
362}
363
364func (wh *Webhook) serveValidate(w http.ResponseWriter, r *http.Request) {
365	serve(w, r, wh.validate)
366}
367
368func (wh *Webhook) validate(request *kubeApiAdmission.AdmissionRequest) *kubeApiAdmission.AdmissionResponse {
369	switch request.Kind.Kind {
370	case collections.IstioPolicyV1Beta1Rules.Resource().Kind(),
371		collections.IstioPolicyV1Beta1Attributemanifests.Resource().Kind(),
372		collections.IstioConfigV1Alpha2Adapters.Resource().Kind(),
373		collections.IstioPolicyV1Beta1Handlers.Resource().Kind(),
374		collections.IstioPolicyV1Beta1Instances.Resource().Kind(),
375		collections.IstioConfigV1Alpha2Templates.Resource().Kind():
376		return wh.admitMixer(request)
377	default:
378		return wh.admitPilot(request)
379	}
380}
381
382func (wh *Webhook) admitPilot(request *kubeApiAdmission.AdmissionRequest) *kubeApiAdmission.AdmissionResponse {
383	switch request.Operation {
384	case kubeApiAdmission.Create, kubeApiAdmission.Update:
385	default:
386		scope.Warnf("Unsupported webhook operation %v", request.Operation)
387		reportValidationFailed(request, reasonUnsupportedOperation)
388		return &kubeApiAdmission.AdmissionResponse{Allowed: true}
389	}
390
391	var obj crd.IstioKind
392	if err := yaml.Unmarshal(request.Object.Raw, &obj); err != nil {
393		scope.Infof("cannot decode configuration: %v", err)
394		reportValidationFailed(request, reasonYamlDecodeError)
395		return toAdmissionResponse(fmt.Errorf("cannot decode configuration: %v", err))
396	}
397
398	gvk := obj.GroupVersionKind()
399
400	// TODO(jasonwzm) remove this when multi-version is supported. v1beta1 shares the same
401	// schema as v1lalpha3. Fake conversion and validate against v1alpha3.
402	if gvk.Group == "networking.istio.io" && gvk.Version == "v1beta1" {
403		gvk.Version = "v1alpha3"
404	}
405	s, exists := wh.schemas.FindByGroupVersionKind(resource.FromKubernetesGVK(&gvk))
406	if !exists {
407		scope.Infof("unrecognized type %v", obj.Kind)
408		reportValidationFailed(request, reasonUnknownType)
409		return toAdmissionResponse(fmt.Errorf("unrecognized type %v", obj.Kind))
410	}
411
412	out, err := crd.ConvertObject(s, &obj, wh.domainSuffix)
413	if err != nil {
414		scope.Infof("error decoding configuration: %v", err)
415		reportValidationFailed(request, reasonCRDConversionError)
416		return toAdmissionResponse(fmt.Errorf("error decoding configuration: %v", err))
417	}
418
419	if err := s.Resource().ValidateProto(out.Name, out.Namespace, out.Spec); err != nil {
420		scope.Infof("configuration is invalid: %v", err)
421		reportValidationFailed(request, reasonInvalidConfig)
422		return toAdmissionResponse(fmt.Errorf("configuration is invalid: %v", err))
423	}
424
425	if reason, err := checkFields(request.Object.Raw, request.Kind.Kind, request.Namespace, obj.Name); err != nil {
426		reportValidationFailed(request, reason)
427		return toAdmissionResponse(err)
428	}
429
430	reportValidationPass(request)
431	return &kubeApiAdmission.AdmissionResponse{Allowed: true}
432}
433
434func (wh *Webhook) admitMixer(request *kubeApiAdmission.AdmissionRequest) *kubeApiAdmission.AdmissionResponse {
435	ev := &store.BackendEvent{
436		Key: store.Key{
437			Namespace: request.Namespace,
438			Kind:      request.Kind.Kind,
439		},
440	}
441	switch request.Operation {
442	case kubeApiAdmission.Create, kubeApiAdmission.Update:
443		ev.Type = store.Update
444		var obj crd.IstioKind
445		if err := yaml.Unmarshal(request.Object.Raw, &obj); err != nil {
446			reportValidationFailed(request, reasonYamlDecodeError)
447			return toAdmissionResponse(fmt.Errorf("cannot decode configuration: %v", err))
448		}
449
450		ev.Value = &store.BackEndResource{
451			Metadata: store.ResourceMeta{
452				Name:        obj.Name,
453				Namespace:   obj.Namespace,
454				Labels:      obj.Labels,
455				Annotations: obj.Annotations,
456				Revision:    obj.ResourceVersion,
457			},
458			Spec: obj.Spec,
459		}
460		ev.Key.Name = ev.Value.Metadata.Name
461
462		if reason, err := checkFields(request.Object.Raw, request.Kind.Kind, request.Namespace, ev.Key.Name); err != nil {
463			reportValidationFailed(request, reason)
464			return toAdmissionResponse(err)
465		}
466
467	case kubeApiAdmission.Delete:
468		if request.Name == "" {
469			reportValidationFailed(request, reasonUnknownType)
470			return toAdmissionResponse(fmt.Errorf("illformed request: name not found on delete request"))
471		}
472		ev.Type = store.Delete
473		ev.Key.Name = request.Name
474	default:
475		scope.Warnf("Unsupported webhook operation %v", request.Operation)
476		reportValidationFailed(request, reasonUnsupportedOperation)
477		return &kubeApiAdmission.AdmissionResponse{Allowed: true}
478	}
479
480	// webhook skips deletions
481	if ev.Type == store.Update {
482		if err := wh.validator.Validate(ev); err != nil {
483			reportValidationFailed(request, reasonInvalidConfig)
484			return toAdmissionResponse(err)
485		}
486	}
487
488	reportValidationPass(request)
489	return &kubeApiAdmission.AdmissionResponse{Allowed: true}
490}
491
492func checkFields(raw []byte, kind string, namespace string, name string) (string, error) {
493	trial := make(map[string]json.RawMessage)
494	if err := yaml.Unmarshal(raw, &trial); err != nil {
495		scope.Infof("cannot decode configuration fields: %v", err)
496		return reasonYamlDecodeError, fmt.Errorf("cannot decode configuration fields: %v", err)
497	}
498
499	for key := range trial {
500		if _, ok := validFields[key]; !ok {
501			scope.Infof("unknown field %q on %s resource %s/%s",
502				key, kind, namespace, name)
503			return reasonInvalidConfig, fmt.Errorf("unknown field %q on %s resource %s/%s",
504				key, kind, namespace, name)
505		}
506	}
507
508	return "", nil
509}
510
511// validatePort checks that the network port is in range
512func validatePort(port int) error {
513	if 1 <= port && port <= 65535 {
514		return nil
515	}
516	return fmt.Errorf("port number %d must be in the range 1..65535", port)
517}
518
519// Validate tests if the Options has valid params.
520func (o Options) Validate() error {
521	var errs *multierror.Error
522	if len(o.CertFile) == 0 {
523		errs = multierror.Append(errs, errors.New("cert file not specified"))
524	}
525	if len(o.KeyFile) == 0 {
526		errs = multierror.Append(errs, errors.New("key file not specified"))
527	}
528	if err := validatePort(int(o.Port)); err != nil {
529		errs = multierror.Append(errs, err)
530	}
531	return errs.ErrorOrNil()
532}
533