1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"fmt"
20
21	"google.golang.org/api/option"
22
23	vkit "cloud.google.com/go/pubsub/apiv1"
24	pb "google.golang.org/genproto/googleapis/pubsub/v1"
25)
26
27// SchemaClient is a Pub/Sub schema client scoped to a single project.
28type SchemaClient struct {
29	sc        *vkit.SchemaClient
30	projectID string
31}
32
33// Close closes the schema client and frees up resources.
34func (s *SchemaClient) Close() error {
35	return s.sc.Close()
36}
37
38// NewSchemaClient creates a new Pub/Sub Schema client.
39func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error) {
40	sc, err := vkit.NewSchemaClient(ctx, opts...)
41	if err != nil {
42		return nil, err
43	}
44	return &SchemaClient{sc: sc, projectID: projectID}, nil
45}
46
47// SchemaConfig is a reference to a PubSub schema.
48type SchemaConfig struct {
49	// The name of the schema populated by the server. This field is read-only.
50	Name string
51
52	// The type of the schema definition.
53	Type SchemaType
54
55	// The definition of the schema. This should contain a string representing
56	// the full definition of the schema that is a valid schema definition of
57	// the type specified in `type`.
58	Definition string
59}
60
61// SchemaType is the possible shcema definition types.
62type SchemaType pb.Schema_Type
63
64const (
65	// SchemaTypeUnspecified is the unused default value.
66	SchemaTypeUnspecified SchemaType = 0
67	// SchemaProtocolBuffer is a protobuf schema definition.
68	SchemaProtocolBuffer SchemaType = 1
69	// SchemaAvro is an Avro schema definition.
70	SchemaAvro SchemaType = 2
71)
72
73// SchemaView is a view of Schema object fields to be returned
74// by GetSchema and ListSchemas.
75type SchemaView pb.SchemaView
76
77const (
78	// SchemaViewUnspecified is the default/unset value.
79	SchemaViewUnspecified SchemaView = 0
80	// SchemaViewBasic includes the name and type of the schema, but not the definition.
81	SchemaViewBasic SchemaView = 1
82	// SchemaViewFull includes all Schema object fields.
83	SchemaViewFull SchemaView = 2
84)
85
86// SchemaSettings are settings for validating messages
87// published against a schema.
88type SchemaSettings struct {
89	Schema   string
90	Encoding SchemaEncoding
91}
92
93func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings {
94	if schema == nil {
95		return nil
96	}
97	return &pb.SchemaSettings{
98		Schema:   schema.Schema,
99		Encoding: pb.Encoding(schema.Encoding),
100	}
101}
102
103func protoToSchemaSettings(pbs *pb.SchemaSettings) *SchemaSettings {
104	if pbs == nil {
105		return nil
106	}
107	return &SchemaSettings{
108		Schema:   pbs.Schema,
109		Encoding: SchemaEncoding(pbs.Encoding),
110	}
111}
112
113// SchemaEncoding is the encoding expected for messages.
114type SchemaEncoding pb.Encoding
115
116const (
117	// EncodingUnspecified is the default unused value.
118	EncodingUnspecified SchemaEncoding = 0
119	// EncodingJSON is the JSON encoding type for a message.
120	EncodingJSON SchemaEncoding = 1
121	// EncodingBinary is the binary encoding type for a message.
122	// For some schema types, binary encoding may not be available.
123	EncodingBinary SchemaEncoding = 2
124)
125
126func (s *SchemaConfig) toProto() *pb.Schema {
127	pbs := &pb.Schema{
128		Name:       s.Name,
129		Type:       pb.Schema_Type(s.Type),
130		Definition: s.Definition,
131	}
132	return pbs
133}
134
135func protoToSchemaConfig(pbs *pb.Schema) *SchemaConfig {
136	return &SchemaConfig{
137		Name:       pbs.Name,
138		Type:       SchemaType(pbs.Type),
139		Definition: pbs.Definition,
140	}
141}
142
143// CreateSchema creates a new schema with the given schemaID
144// and config. Schemas cannot be updated after creation.
145func (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) {
146	req := &pb.CreateSchemaRequest{
147		Parent:   fmt.Sprintf("projects/%s", c.projectID),
148		Schema:   s.toProto(),
149		SchemaId: schemaID,
150	}
151	pbs, err := c.sc.CreateSchema(ctx, req)
152	if err != nil {
153		return nil, err
154	}
155	return protoToSchemaConfig(pbs), nil
156}
157
158// Schema retrieves the configuration of a schema given a schemaID and a view.
159func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error) {
160	schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
161	req := &pb.GetSchemaRequest{
162		Name: schemaPath,
163		View: pb.SchemaView(view),
164	}
165	s, err := c.sc.GetSchema(ctx, req)
166	if err != nil {
167		return nil, err
168	}
169	return protoToSchemaConfig(s), nil
170}
171
172// Schemas returns an iterator which returns all of the schemas for the client's project.
173func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIterator {
174	return &SchemaIterator{
175		it: c.sc.ListSchemas(ctx, &pb.ListSchemasRequest{
176			Parent: fmt.Sprintf("projects/%s", c.projectID),
177			View:   pb.SchemaView(view),
178		}),
179	}
180}
181
182// SchemaIterator is a struct used to iterate over schemas.
183type SchemaIterator struct {
184	it  *vkit.SchemaIterator
185	err error
186}
187
188// Next returns the next schema. If there are no more schemas, iterator.Done will be returned.
189func (s *SchemaIterator) Next() (*SchemaConfig, error) {
190	if s.err != nil {
191		return nil, s.err
192	}
193	pbs, err := s.it.Next()
194	if err != nil {
195		return nil, err
196	}
197	return protoToSchemaConfig(pbs), nil
198}
199
200// DeleteSchema deletes an existing schema given a schema ID.
201func (s *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error {
202	schemaPath := fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID)
203	return s.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{
204		Name: schemaPath,
205	})
206}
207
208// ValidateSchemaResult is the response for the ValidateSchema method.
209// Reserved for future use.
210type ValidateSchemaResult struct{}
211
212// ValidateSchema validates a schema config and returns an error if invalid.
213func (s *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) {
214	req := &pb.ValidateSchemaRequest{
215		Parent: fmt.Sprintf("projects/%s", s.projectID),
216		Schema: schema.toProto(),
217	}
218	_, err := s.sc.ValidateSchema(ctx, req)
219	if err != nil {
220		return nil, err
221	}
222	return &ValidateSchemaResult{}, nil
223}
224
225// ValidateMessageResult is the response for the ValidateMessage method.
226// Reserved for future use.
227type ValidateMessageResult struct{}
228
229// ValidateMessageWithConfig validates a message against an schema specified
230// by a schema config.
231func (s *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) {
232	req := &pb.ValidateMessageRequest{
233		Parent: fmt.Sprintf("projects/%s", s.projectID),
234		SchemaSpec: &pb.ValidateMessageRequest_Schema{
235			Schema: config.toProto(),
236		},
237		Message:  msg,
238		Encoding: pb.Encoding(encoding),
239	}
240	_, err := s.sc.ValidateMessage(ctx, req)
241	if err != nil {
242		return nil, err
243	}
244	return &ValidateMessageResult{}, nil
245}
246
247// ValidateMessageWithID validates a message against an schema specified
248// by the schema ID of an existing schema.
249func (s *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) {
250	req := &pb.ValidateMessageRequest{
251		Parent: fmt.Sprintf("projects/%s", s.projectID),
252		SchemaSpec: &pb.ValidateMessageRequest_Name{
253			Name: fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID),
254		},
255		Message:  msg,
256		Encoding: pb.Encoding(encoding),
257	}
258	_, err := s.sc.ValidateMessage(ctx, req)
259	if err != nil {
260		return nil, err
261	}
262	return &ValidateMessageResult{}, nil
263}
264