1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "fmt" 20 21 "google.golang.org/api/option" 22 23 vkit "cloud.google.com/go/pubsub/apiv1" 24 pb "google.golang.org/genproto/googleapis/pubsub/v1" 25) 26 27// SchemaClient is a Pub/Sub schema client scoped to a single project. 28type SchemaClient struct { 29 sc *vkit.SchemaClient 30 projectID string 31} 32 33// Close closes the schema client and frees up resources. 34func (s *SchemaClient) Close() error { 35 return s.sc.Close() 36} 37 38// NewSchemaClient creates a new Pub/Sub Schema client. 39func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error) { 40 sc, err := vkit.NewSchemaClient(ctx, opts...) 41 if err != nil { 42 return nil, err 43 } 44 return &SchemaClient{sc: sc, projectID: projectID}, nil 45} 46 47// SchemaConfig is a reference to a PubSub schema. 48type SchemaConfig struct { 49 // The name of the schema populated by the server. This field is read-only. 50 Name string 51 52 // The type of the schema definition. 53 Type SchemaType 54 55 // The definition of the schema. This should contain a string representing 56 // the full definition of the schema that is a valid schema definition of 57 // the type specified in `type`. 58 Definition string 59} 60 61// SchemaType is the possible shcema definition types. 62type SchemaType pb.Schema_Type 63 64const ( 65 // SchemaTypeUnspecified is the unused default value. 66 SchemaTypeUnspecified SchemaType = 0 67 // SchemaProtocolBuffer is a protobuf schema definition. 68 SchemaProtocolBuffer SchemaType = 1 69 // SchemaAvro is an Avro schema definition. 70 SchemaAvro SchemaType = 2 71) 72 73// SchemaView is a view of Schema object fields to be returned 74// by GetSchema and ListSchemas. 75type SchemaView pb.SchemaView 76 77const ( 78 // SchemaViewUnspecified is the default/unset value. 79 SchemaViewUnspecified SchemaView = 0 80 // SchemaViewBasic includes the name and type of the schema, but not the definition. 81 SchemaViewBasic SchemaView = 1 82 // SchemaViewFull includes all Schema object fields. 83 SchemaViewFull SchemaView = 2 84) 85 86// SchemaSettings are settings for validating messages 87// published against a schema. 88type SchemaSettings struct { 89 Schema string 90 Encoding SchemaEncoding 91} 92 93func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings { 94 if schema == nil { 95 return nil 96 } 97 return &pb.SchemaSettings{ 98 Schema: schema.Schema, 99 Encoding: pb.Encoding(schema.Encoding), 100 } 101} 102 103func protoToSchemaSettings(pbs *pb.SchemaSettings) *SchemaSettings { 104 if pbs == nil { 105 return nil 106 } 107 return &SchemaSettings{ 108 Schema: pbs.Schema, 109 Encoding: SchemaEncoding(pbs.Encoding), 110 } 111} 112 113// SchemaEncoding is the encoding expected for messages. 114type SchemaEncoding pb.Encoding 115 116const ( 117 // EncodingUnspecified is the default unused value. 118 EncodingUnspecified SchemaEncoding = 0 119 // EncodingJSON is the JSON encoding type for a message. 120 EncodingJSON SchemaEncoding = 1 121 // EncodingBinary is the binary encoding type for a message. 122 // For some schema types, binary encoding may not be available. 123 EncodingBinary SchemaEncoding = 2 124) 125 126func (s *SchemaConfig) toProto() *pb.Schema { 127 pbs := &pb.Schema{ 128 Name: s.Name, 129 Type: pb.Schema_Type(s.Type), 130 Definition: s.Definition, 131 } 132 return pbs 133} 134 135func protoToSchemaConfig(pbs *pb.Schema) *SchemaConfig { 136 return &SchemaConfig{ 137 Name: pbs.Name, 138 Type: SchemaType(pbs.Type), 139 Definition: pbs.Definition, 140 } 141} 142 143// CreateSchema creates a new schema with the given schemaID 144// and config. Schemas cannot be updated after creation. 145func (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) { 146 req := &pb.CreateSchemaRequest{ 147 Parent: fmt.Sprintf("projects/%s", c.projectID), 148 Schema: s.toProto(), 149 SchemaId: schemaID, 150 } 151 pbs, err := c.sc.CreateSchema(ctx, req) 152 if err != nil { 153 return nil, err 154 } 155 return protoToSchemaConfig(pbs), nil 156} 157 158// Schema retrieves the configuration of a schema given a schemaID and a view. 159func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error) { 160 schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID) 161 req := &pb.GetSchemaRequest{ 162 Name: schemaPath, 163 View: pb.SchemaView(view), 164 } 165 s, err := c.sc.GetSchema(ctx, req) 166 if err != nil { 167 return nil, err 168 } 169 return protoToSchemaConfig(s), nil 170} 171 172// Schemas returns an iterator which returns all of the schemas for the client's project. 173func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIterator { 174 return &SchemaIterator{ 175 it: c.sc.ListSchemas(ctx, &pb.ListSchemasRequest{ 176 Parent: fmt.Sprintf("projects/%s", c.projectID), 177 View: pb.SchemaView(view), 178 }), 179 } 180} 181 182// SchemaIterator is a struct used to iterate over schemas. 183type SchemaIterator struct { 184 it *vkit.SchemaIterator 185 err error 186} 187 188// Next returns the next schema. If there are no more schemas, iterator.Done will be returned. 189func (s *SchemaIterator) Next() (*SchemaConfig, error) { 190 if s.err != nil { 191 return nil, s.err 192 } 193 pbs, err := s.it.Next() 194 if err != nil { 195 return nil, err 196 } 197 return protoToSchemaConfig(pbs), nil 198} 199 200// DeleteSchema deletes an existing schema given a schema ID. 201func (s *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error { 202 schemaPath := fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID) 203 return s.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{ 204 Name: schemaPath, 205 }) 206} 207 208// ValidateSchemaResult is the response for the ValidateSchema method. 209// Reserved for future use. 210type ValidateSchemaResult struct{} 211 212// ValidateSchema validates a schema config and returns an error if invalid. 213func (s *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) { 214 req := &pb.ValidateSchemaRequest{ 215 Parent: fmt.Sprintf("projects/%s", s.projectID), 216 Schema: schema.toProto(), 217 } 218 _, err := s.sc.ValidateSchema(ctx, req) 219 if err != nil { 220 return nil, err 221 } 222 return &ValidateSchemaResult{}, nil 223} 224 225// ValidateMessageResult is the response for the ValidateMessage method. 226// Reserved for future use. 227type ValidateMessageResult struct{} 228 229// ValidateMessageWithConfig validates a message against an schema specified 230// by a schema config. 231func (s *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) { 232 req := &pb.ValidateMessageRequest{ 233 Parent: fmt.Sprintf("projects/%s", s.projectID), 234 SchemaSpec: &pb.ValidateMessageRequest_Schema{ 235 Schema: config.toProto(), 236 }, 237 Message: msg, 238 Encoding: pb.Encoding(encoding), 239 } 240 _, err := s.sc.ValidateMessage(ctx, req) 241 if err != nil { 242 return nil, err 243 } 244 return &ValidateMessageResult{}, nil 245} 246 247// ValidateMessageWithID validates a message against an schema specified 248// by the schema ID of an existing schema. 249func (s *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) { 250 req := &pb.ValidateMessageRequest{ 251 Parent: fmt.Sprintf("projects/%s", s.projectID), 252 SchemaSpec: &pb.ValidateMessageRequest_Name{ 253 Name: fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID), 254 }, 255 Message: msg, 256 Encoding: pb.Encoding(encoding), 257 } 258 _, err := s.sc.ValidateMessage(ctx, req) 259 if err != nil { 260 return nil, err 261 } 262 return &ValidateMessageResult{}, nil 263} 264