1// Copyright 2013 M-Lab
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// +build appengine
16
17package rtt
18
19import (
20	"appengine"
21	"appengine/datastore"
22	"appengine/memcache"
23	"appengine/taskqueue"
24	"fmt"
25	"net/url"
26	"time"
27)
28
29const (
30	MaxDSReadPerQuery  = 1000
31	MaxDSWritePerQuery = 500
32)
33
34// dsReadChunk is a structure with which new ClientGroup lists can be split into
35// lengths <= MaxDSReadPerQuery such that datastore.GetMulti works.
36type dsReadChunk struct {
37	keys []*datastore.Key
38	cgs  []*ClientGroup
39}
40
41// len returns the length of the slice *dsReadChunk.keys.
42func (c *dsReadChunk) len() int {
43	return len(c.keys)
44}
45
46// newDSReadChunk returns a new *dsReadChunk with the keys and cgs slices made.
47func newDSReadChunk() *dsReadChunk {
48	return &dsReadChunk{
49		keys: make([]*datastore.Key, 0, MaxDSReadPerQuery),
50		cgs:  make([]*ClientGroup, 0, MaxDSReadPerQuery),
51	}
52}
53
54// divideIntoDSReadChunks divides GetMulti operations into MaxDSReadPerQuery
55// sized operations to adhere with GAE limits for a given map[string]*ClientGroup.
56func divideIntoDSReadChunks(c appengine.Context, newcgs map[string]*ClientGroup) []*dsReadChunk {
57	chunks := make([]*dsReadChunk, 0)
58	chunk := newDSReadChunk()
59
60	parentKey := DatastoreParentKey(c)
61	for cgStr, cg := range newcgs {
62		// Add into chunk
63		chunk.keys = append(chunk.keys, datastore.NewKey(c, "ClientGroup", cgStr, 0, parentKey))
64		chunk.cgs = append(chunk.cgs, cg)
65
66		// Make sure read chunks are only as large as MaxDSReadPerQuery.
67		// Create new chunk if size reached.
68		if chunk.len() == MaxDSReadPerQuery {
69			chunks = append(chunks, chunk)
70			chunk = newDSReadChunk()
71		}
72	}
73	return chunks
74}
75
76// dsWriteChunk is a structure with which new ClientGroup lists can be split
77// into lengths <= MaxDSWritePerQuery such that datastore.PutMulti works.
78type dsWriteChunk struct {
79	keys []*datastore.Key
80	cgs  []ClientGroup
81}
82
83// len returns the length of the slice *dsWriteChunk.keys.
84func (c *dsWriteChunk) len() int {
85	return len(c.keys)
86}
87
88// newDSWriteChunk returns a new *dsWriteChunk with the keys and cgs slices made.
89func newDSWriteChunk() *dsWriteChunk {
90	return &dsWriteChunk{
91		keys: make([]*datastore.Key, 0, MaxDSWritePerQuery),
92		cgs:  make([]ClientGroup, 0, MaxDSWritePerQuery),
93	}
94}
95
96// putQueueRequest keeps track of a queue for datastore.PutMulti requests, as
97// well as the total number of Puts done.
98type putQueueRequest struct {
99	key   *datastore.Key
100	cg    *ClientGroup
101	queue *dsWriteChunk
102	putN  int
103}
104
105// add places a newly updated ClientGroup in a PutMulti queue. This queue is
106// later processed by putQueueRequest.process.
107func (r *putQueueRequest) add(c appengine.Context, dateStr string, k *datastore.Key, cg *ClientGroup) {
108	if r.queue == nil || r.queue.keys == nil {
109		r.queue = newDSWriteChunk()
110	}
111
112	r.queue.keys = append(r.queue.keys, k)
113	r.queue.cgs = append(r.queue.cgs, *cg)
114
115	if r.queue.len() == MaxDSWritePerQuery {
116		r.process(c, dateStr)
117	}
118}
119
120// process processes a queue of newly updated ClientGroups. This is done so that
121// MaxDSWritePerQuery no. of Puts can be done to reduce the number of queries to
122// datastore and therefore the time taken to Put all changes to datastore.
123func (r *putQueueRequest) process(c appengine.Context, dateStr string) {
124	if r.queue == nil || r.queue.len() == 0 { // Don't process further if nothing to process
125		return
126	}
127	n := r.queue.len()
128
129	r.putN += n
130	c.Infof("rtt: Submitting put tasks for %v records. (Total: %d rows)", n, r.putN)
131
132	addTaskClientGroupPut(c, dateStr, r.queue.cgs)
133	r.queue = newDSWriteChunk()
134}
135
136// addTaskClientGroupPut receives a list of ClientGroups to put into datastore
137// and stores it temporarily into memcache. It then submits the key as a
138// taskqueue task.
139func addTaskClientGroupPut(c appengine.Context, dateStr string, cgs []ClientGroup) {
140	// Create unique key for memcache
141	key := cgMemcachePutKey()
142
143	// Store CGs into memcache
144	item := &memcache.Item{
145		Key:    key,
146		Object: cgs,
147	}
148	if err := memcache.Gob.Set(c, item); err != nil {
149		c.Errorf("rtt.addTaskClientGroupPut:memcache.Set: %s", err)
150		return
151	}
152
153	// Submit taskqueue task
154	values := make(url.Values)
155	values.Add(FormKeyPutKey, key)
156	values.Add(FormKeyImportDate, dateStr)
157	task := taskqueue.NewPOSTTask(URLTaskImportPut, values)
158	_, err := taskqueue.Add(c, task, TaskQueueNameImportPut)
159	if err != nil {
160		c.Errorf("rtt.addTaskClientGroupPut:taskqueue.Add: %s", err)
161		return
162	}
163}
164
165// DatastoreParentKey returns a datastore key to use as a parent key for rtt
166// related datastore entries.
167func DatastoreParentKey(c appengine.Context) *datastore.Key {
168	return datastore.NewKey(c, "string", "rtt", 0, nil)
169}
170
171// cgMemcachePutKey generates a memcache key string for use in Put operations
172// when []ClientGroup is cached into memcache.
173func cgMemcachePutKey() string {
174	ns := time.Now().UnixNano()
175	return fmt.Sprintf("rtt:bqImport:Put:%d", ns)
176}
177