1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package storage 18 19import ( 20 "context" 21 "encoding/json" 22 "fmt" 23 "path" 24 "time" 25 26 "go.etcd.io/etcd/client/pkg/v3/transport" 27 clientv3 "go.etcd.io/etcd/client/v3" 28 "google.golang.org/grpc" 29 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" 30 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" 31 "k8s.io/apiserver/pkg/registry/generic" 32 "k8s.io/apiserver/pkg/storage/storagebackend" 33) 34 35// EtcdObjectReader provides direct access to custom resource objects stored in etcd. 36type EtcdObjectReader struct { 37 etcdClient *clientv3.Client 38 storagePrefix string 39 crd *apiextensionsv1.CustomResourceDefinition 40} 41 42// NewEtcdObjectReader creates a reader for accessing custom resource objects directly from etcd. 43func NewEtcdObjectReader(etcdClient *clientv3.Client, restOptions *generic.RESTOptions, crd *apiextensionsv1.CustomResourceDefinition) *EtcdObjectReader { 44 return &EtcdObjectReader{etcdClient, restOptions.StorageConfig.Prefix, crd} 45} 46 47// WaitForStorageVersion calls the updateObjFn periodically and waits for the version of the custom resource stored in etcd to be set to the provided version. 48// Typically updateObjFn should perform a noop update to the object so that when stored version of a CRD changes, the object is written at the updated storage version. 49// If the timeout is exceeded a error is returned. 50// This is useful when updating the stored version of an existing CRD because the update does not take effect immediately. 51func (s *EtcdObjectReader) WaitForStorageVersion(version string, ns, name string, timeout time.Duration, updateObjFn func()) error { 52 waitCh := time.After(timeout) 53 for { 54 storage, err := s.GetStoredCustomResource(ns, name) 55 if err != nil { 56 return err 57 } 58 if storage.GetObjectKind().GroupVersionKind().Version == version { 59 return nil 60 } 61 select { 62 case <-waitCh: 63 return fmt.Errorf("timed out after %v waiting for storage version to be %s for object (namespace:%s name:%s)", timeout, version, ns, name) 64 case <-time.After(10 * time.Millisecond): 65 updateObjFn() 66 } 67 } 68} 69 70// GetStoredCustomResource gets the storage representation of a custom resource from etcd. 71func (s *EtcdObjectReader) GetStoredCustomResource(ns, name string) (*unstructured.Unstructured, error) { 72 key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name) 73 resp, err := s.etcdClient.KV.Get(context.Background(), key) 74 if err != nil { 75 return nil, fmt.Errorf("error getting storage object %s, %s from etcd at key %s: %v", ns, name, key, err) 76 } 77 if len(resp.Kvs) == 0 { 78 return nil, fmt.Errorf("no storage object found for %s, %s in etcd for key %s", ns, name, key) 79 } 80 raw := resp.Kvs[0].Value 81 u := &unstructured.Unstructured{Object: map[string]interface{}{}} 82 if err := json.Unmarshal(raw, u); err != nil { 83 return nil, fmt.Errorf("error deserializing object %s: %v", string(raw), err) 84 } 85 return u, nil 86} 87 88// SetStoredCustomResource writes the storage representation of a custom resource to etcd. 89func (s *EtcdObjectReader) SetStoredCustomResource(ns, name string, obj *unstructured.Unstructured) error { 90 bs, err := obj.MarshalJSON() 91 if err != nil { 92 return err 93 } 94 95 key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name) 96 if _, err := s.etcdClient.KV.Put(context.Background(), key, string(bs)); err != nil { 97 return fmt.Errorf("error setting storage object %s, %s from etcd at key %s: %v", ns, name, key, err) 98 } 99 return nil 100} 101 102// GetEtcdClients returns an initialized clientv3.Client and clientv3.KV. 103func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) { 104 tlsInfo := transport.TLSInfo{ 105 CertFile: config.CertFile, 106 KeyFile: config.KeyFile, 107 TrustedCAFile: config.TrustedCAFile, 108 } 109 110 tlsConfig, err := tlsInfo.ClientConfig() 111 if err != nil { 112 return nil, nil, err 113 } 114 115 cfg := clientv3.Config{ 116 Endpoints: config.ServerList, 117 DialTimeout: 20 * time.Second, 118 DialOptions: []grpc.DialOption{ 119 grpc.WithBlock(), // block until the underlying connection is up 120 }, 121 TLS: tlsConfig, 122 } 123 124 c, err := clientv3.New(cfg) 125 if err != nil { 126 return nil, nil, err 127 } 128 129 return c, clientv3.NewKV(c), nil 130} 131