/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package rls import ( "bytes" "encoding/json" "fmt" "time" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/ptypes" durationpb "github.com/golang/protobuf/ptypes/duration" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/rls/internal/keys" rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1" "google.golang.org/grpc/internal/grpcutil" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) const ( // This is max duration that we are willing to cache RLS responses. If the // service config doesn't specify a value for max_age or if it specified a // value greater that this, we will use this value instead. maxMaxAge = 5 * time.Minute // If lookup_service_timeout is not specified in the service config, we use // a default of 10 seconds. defaultLookupServiceTimeout = 10 * time.Second // This is set to the targetNameField in the child policy config during // service config validation. dummyChildPolicyTarget = "target_name_to_be_filled_in_later" ) // lbConfig contains the parsed and validated contents of the // loadBalancingConfig section of the service config. The RLS LB policy will // use this to directly access config data instead of ploughing through proto // fields. type lbConfig struct { serviceconfig.LoadBalancingConfig kbMap keys.BuilderMap lookupService string lookupServiceTimeout time.Duration maxAge time.Duration staleAge time.Duration cacheSizeBytes int64 defaultTarget string cpName string cpTargetField string cpConfig map[string]json.RawMessage } func (lbCfg *lbConfig) Equal(other *lbConfig) bool { return lbCfg.kbMap.Equal(other.kbMap) && lbCfg.lookupService == other.lookupService && lbCfg.lookupServiceTimeout == other.lookupServiceTimeout && lbCfg.maxAge == other.maxAge && lbCfg.staleAge == other.staleAge && lbCfg.cacheSizeBytes == other.cacheSizeBytes && lbCfg.defaultTarget == other.defaultTarget && lbCfg.cpName == other.cpName && lbCfg.cpTargetField == other.cpTargetField && cpConfigEqual(lbCfg.cpConfig, other.cpConfig) } func cpConfigEqual(am, bm map[string]json.RawMessage) bool { if (bm == nil) != (am == nil) { return false } if len(bm) != len(am) { return false } for k, jsonA := range am { jsonB, ok := bm[k] if !ok { return false } if !bytes.Equal(jsonA, jsonB) { return false } } return true } // This struct resembles the JSON respresentation of the loadBalancing config // and makes it easier to unmarshal. type lbConfigJSON struct { RouteLookupConfig json.RawMessage ChildPolicy []*loadBalancingConfig ChildPolicyConfigTargetFieldName string } // loadBalancingConfig represents a single load balancing config, // stored in JSON format. // // TODO(easwars): This code seems to be repeated in a few places // (service_config.go and in the xds code as well). Refactor and re-use. type loadBalancingConfig struct { Name string Config json.RawMessage } // MarshalJSON returns a JSON encoding of l. func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) { return nil, fmt.Errorf("rls: loadBalancingConfig.MarshalJSON() is unimplemented") } // UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error { var cfg map[string]json.RawMessage if err := json.Unmarshal(data, &cfg); err != nil { return err } for name, config := range cfg { l.Name = name l.Config = config } return nil } // ParseConfig parses and validates the JSON representation of the service // config and returns the loadBalancingConfig to be used by the RLS LB policy. // // Helps implement the balancer.ConfigParser interface. // // The following validation checks are performed: // * routeLookupConfig: // ** grpc_keybuilders field: // - must have at least one entry // - must not have two entries with the same Name // - must not have any entry with a Name with the service field unset or // empty // - must not have any entries without a Name // - must not have a headers entry that has required_match set // - must not have two headers entries with the same key within one entry // ** lookup_service field: // - must be set and non-empty and must parse as a target URI // ** max_age field: // - if not specified or is greater than maxMaxAge, it will be reset to // maxMaxAge // ** stale_age field: // - if the value is greater than or equal to max_age, it is ignored // - if set, then max_age must also be set // ** valid_targets field: // - will be ignored // ** cache_size_bytes field: // - must be greater than zero // - TODO(easwars): Define a minimum value for this field, to be used when // left unspecified // * childPolicy field: // - must find a valid child policy with a valid config (the child policy must // be able to parse the provided config successfully when we pass it a dummy // target name in the target_field provided by the // childPolicyConfigTargetFieldName field) // * childPolicyConfigTargetFieldName field: // - must be set and non-empty func (*rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { cfgJSON := &lbConfigJSON{} if err := json.Unmarshal(c, cfgJSON); err != nil { return nil, fmt.Errorf("rls: json unmarshal failed for service config {%+v}: %v", string(c), err) } m := jsonpb.Unmarshaler{AllowUnknownFields: true} rlsProto := &rlspb.RouteLookupConfig{} if err := m.Unmarshal(bytes.NewReader(cfgJSON.RouteLookupConfig), rlsProto); err != nil { return nil, fmt.Errorf("rls: bad RouteLookupConfig proto {%+v}: %v", string(cfgJSON.RouteLookupConfig), err) } var childPolicy *loadBalancingConfig for _, lbcfg := range cfgJSON.ChildPolicy { if balancer.Get(lbcfg.Name) != nil { childPolicy = lbcfg break } } kbMap, err := keys.MakeBuilderMap(rlsProto) if err != nil { return nil, err } lookupService := rlsProto.GetLookupService() if lookupService == "" { return nil, fmt.Errorf("rls: empty lookup_service in service config {%+v}", string(c)) } parsedTarget := grpcutil.ParseTarget(lookupService) if parsedTarget.Scheme == "" { parsedTarget.Scheme = resolver.GetDefaultScheme() } if resolver.Get(parsedTarget.Scheme) == nil { return nil, fmt.Errorf("rls: invalid target URI in lookup_service {%s}", lookupService) } lookupServiceTimeout, err := convertDuration(rlsProto.GetLookupServiceTimeout()) if err != nil { return nil, fmt.Errorf("rls: failed to parse lookup_service_timeout in service config {%+v}: %v", string(c), err) } if lookupServiceTimeout == 0 { lookupServiceTimeout = defaultLookupServiceTimeout } maxAge, err := convertDuration(rlsProto.GetMaxAge()) if err != nil { return nil, fmt.Errorf("rls: failed to parse max_age in service config {%+v}: %v", string(c), err) } staleAge, err := convertDuration(rlsProto.GetStaleAge()) if err != nil { return nil, fmt.Errorf("rls: failed to parse staleAge in service config {%+v}: %v", string(c), err) } if staleAge != 0 && maxAge == 0 { return nil, fmt.Errorf("rls: stale_age is set, but max_age is not in service config {%+v}", string(c)) } if staleAge >= maxAge { logger.Info("rls: stale_age {%v} is greater than max_age {%v}, ignoring it", staleAge, maxAge) staleAge = 0 } if maxAge == 0 || maxAge > maxMaxAge { logger.Infof("rls: max_age in service config is %v, using %v", maxAge, maxMaxAge) maxAge = maxMaxAge } cacheSizeBytes := rlsProto.GetCacheSizeBytes() if cacheSizeBytes <= 0 { return nil, fmt.Errorf("rls: cache_size_bytes must be greater than 0 in service config {%+v}", string(c)) } if childPolicy == nil { return nil, fmt.Errorf("rls: childPolicy is invalid in service config {%+v}", string(c)) } if cfgJSON.ChildPolicyConfigTargetFieldName == "" { return nil, fmt.Errorf("rls: childPolicyConfigTargetFieldName field is not set in service config {%+v}", string(c)) } // TODO(easwars): When we start instantiating the child policy from the // parent RLS LB policy, we could make this function a method on the // lbConfig object and share the code. We would be parsing the child policy // config again during that time. The only difference betweeen now and then // would be that we would be using real targetField name instead of the // dummy. So, we could make the targetName field a parameter to this // function during the refactor. cpCfg, err := validateChildPolicyConfig(childPolicy, cfgJSON.ChildPolicyConfigTargetFieldName) if err != nil { return nil, err } return &lbConfig{ kbMap: kbMap, lookupService: lookupService, lookupServiceTimeout: lookupServiceTimeout, maxAge: maxAge, staleAge: staleAge, cacheSizeBytes: cacheSizeBytes, defaultTarget: rlsProto.GetDefaultTarget(), // TODO(easwars): Once we refactor validateChildPolicyConfig and make // it a method on the lbConfig object, we could directly store the // balancer.Builder and/or balancer.ConfigParser here instead of the // Name. That would mean that we would have to create the lbConfig // object here first before validating the childPolicy config, but // that's a minor detail. cpName: childPolicy.Name, cpTargetField: cfgJSON.ChildPolicyConfigTargetFieldName, cpConfig: cpCfg, }, nil } // validateChildPolicyConfig validates the child policy config received in the // service config. This makes it possible for us to reject service configs // which contain invalid child policy configs which we know will fail for sure. // // It does the following: // * Unmarshals the provided child policy config into a map of string to // json.RawMessage. This allows us to add an entry to the map corresponding // to the targetFieldName that we received in the service config. // * Marshals the map back into JSON, finds the config parser associated with // the child policy and asks it to validate the config. // * If the validation succeeded, removes the dummy entry from the map and // returns it. If any of the above steps failed, it returns an error. func validateChildPolicyConfig(cp *loadBalancingConfig, cpTargetField string) (map[string]json.RawMessage, error) { var childConfig map[string]json.RawMessage if err := json.Unmarshal(cp.Config, &childConfig); err != nil { return nil, fmt.Errorf("rls: json unmarshal failed for child policy config {%+v}: %v", cp.Config, err) } childConfig[cpTargetField], _ = json.Marshal(dummyChildPolicyTarget) jsonCfg, err := json.Marshal(childConfig) if err != nil { return nil, fmt.Errorf("rls: json marshal failed for child policy config {%+v}: %v", childConfig, err) } builder := balancer.Get(cp.Name) if builder == nil { // This should never happen since we already made sure that the child // policy name mentioned in the service config is a valid one. return nil, fmt.Errorf("rls: balancer builder not found for child_policy %v", cp.Name) } parser, ok := builder.(balancer.ConfigParser) if !ok { return nil, fmt.Errorf("rls: balancer builder for child_policy does not implement balancer.ConfigParser: %v", cp.Name) } _, err = parser.ParseConfig(jsonCfg) if err != nil { return nil, fmt.Errorf("rls: childPolicy config validation failed: %v", err) } delete(childConfig, cpTargetField) return childConfig, nil } func convertDuration(d *durationpb.Duration) (time.Duration, error) { if d == nil { return 0, nil } return ptypes.Duration(d) }