1// Copyright 2013 M-Lab 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// +build appengine 16 17package rtt 18 19import ( 20 "appengine" 21 "appengine/datastore" 22 "appengine/memcache" 23 "appengine/taskqueue" 24 "fmt" 25 "net/url" 26 "time" 27) 28 29const ( 30 MaxDSReadPerQuery = 1000 31 MaxDSWritePerQuery = 500 32) 33 34// dsReadChunk is a structure with which new ClientGroup lists can be split into 35// lengths <= MaxDSReadPerQuery such that datastore.GetMulti works. 36type dsReadChunk struct { 37 keys []*datastore.Key 38 cgs []*ClientGroup 39} 40 41// len returns the length of the slice *dsReadChunk.keys. 42func (c *dsReadChunk) len() int { 43 return len(c.keys) 44} 45 46// newDSReadChunk returns a new *dsReadChunk with the keys and cgs slices made. 47func newDSReadChunk() *dsReadChunk { 48 return &dsReadChunk{ 49 keys: make([]*datastore.Key, 0, MaxDSReadPerQuery), 50 cgs: make([]*ClientGroup, 0, MaxDSReadPerQuery), 51 } 52} 53 54// divideIntoDSReadChunks divides GetMulti operations into MaxDSReadPerQuery 55// sized operations to adhere with GAE limits for a given map[string]*ClientGroup. 56func divideIntoDSReadChunks(c appengine.Context, newcgs map[string]*ClientGroup) []*dsReadChunk { 57 chunks := make([]*dsReadChunk, 0) 58 chunk := newDSReadChunk() 59 60 parentKey := DatastoreParentKey(c) 61 for cgStr, cg := range newcgs { 62 // Add into chunk 63 chunk.keys = append(chunk.keys, datastore.NewKey(c, "ClientGroup", cgStr, 0, parentKey)) 64 chunk.cgs = append(chunk.cgs, cg) 65 66 // Make sure read chunks are only as large as MaxDSReadPerQuery. 67 // Create new chunk if size reached. 68 if chunk.len() == MaxDSReadPerQuery { 69 chunks = append(chunks, chunk) 70 chunk = newDSReadChunk() 71 } 72 } 73 return chunks 74} 75 76// dsWriteChunk is a structure with which new ClientGroup lists can be split 77// into lengths <= MaxDSWritePerQuery such that datastore.PutMulti works. 78type dsWriteChunk struct { 79 keys []*datastore.Key 80 cgs []ClientGroup 81} 82 83// len returns the length of the slice *dsWriteChunk.keys. 84func (c *dsWriteChunk) len() int { 85 return len(c.keys) 86} 87 88// newDSWriteChunk returns a new *dsWriteChunk with the keys and cgs slices made. 89func newDSWriteChunk() *dsWriteChunk { 90 return &dsWriteChunk{ 91 keys: make([]*datastore.Key, 0, MaxDSWritePerQuery), 92 cgs: make([]ClientGroup, 0, MaxDSWritePerQuery), 93 } 94} 95 96// putQueueRequest keeps track of a queue for datastore.PutMulti requests, as 97// well as the total number of Puts done. 98type putQueueRequest struct { 99 key *datastore.Key 100 cg *ClientGroup 101 queue *dsWriteChunk 102 putN int 103} 104 105// add places a newly updated ClientGroup in a PutMulti queue. This queue is 106// later processed by putQueueRequest.process. 107func (r *putQueueRequest) add(c appengine.Context, dateStr string, k *datastore.Key, cg *ClientGroup) { 108 if r.queue == nil || r.queue.keys == nil { 109 r.queue = newDSWriteChunk() 110 } 111 112 r.queue.keys = append(r.queue.keys, k) 113 r.queue.cgs = append(r.queue.cgs, *cg) 114 115 if r.queue.len() == MaxDSWritePerQuery { 116 r.process(c, dateStr) 117 } 118} 119 120// process processes a queue of newly updated ClientGroups. This is done so that 121// MaxDSWritePerQuery no. of Puts can be done to reduce the number of queries to 122// datastore and therefore the time taken to Put all changes to datastore. 123func (r *putQueueRequest) process(c appengine.Context, dateStr string) { 124 if r.queue == nil || r.queue.len() == 0 { // Don't process further if nothing to process 125 return 126 } 127 n := r.queue.len() 128 129 r.putN += n 130 c.Infof("rtt: Submitting put tasks for %v records. (Total: %d rows)", n, r.putN) 131 132 addTaskClientGroupPut(c, dateStr, r.queue.cgs) 133 r.queue = newDSWriteChunk() 134} 135 136// addTaskClientGroupPut receives a list of ClientGroups to put into datastore 137// and stores it temporarily into memcache. It then submits the key as a 138// taskqueue task. 139func addTaskClientGroupPut(c appengine.Context, dateStr string, cgs []ClientGroup) { 140 // Create unique key for memcache 141 key := cgMemcachePutKey() 142 143 // Store CGs into memcache 144 item := &memcache.Item{ 145 Key: key, 146 Object: cgs, 147 } 148 if err := memcache.Gob.Set(c, item); err != nil { 149 c.Errorf("rtt.addTaskClientGroupPut:memcache.Set: %s", err) 150 return 151 } 152 153 // Submit taskqueue task 154 values := make(url.Values) 155 values.Add(FormKeyPutKey, key) 156 values.Add(FormKeyImportDate, dateStr) 157 task := taskqueue.NewPOSTTask(URLTaskImportPut, values) 158 _, err := taskqueue.Add(c, task, TaskQueueNameImportPut) 159 if err != nil { 160 c.Errorf("rtt.addTaskClientGroupPut:taskqueue.Add: %s", err) 161 return 162 } 163} 164 165// DatastoreParentKey returns a datastore key to use as a parent key for rtt 166// related datastore entries. 167func DatastoreParentKey(c appengine.Context) *datastore.Key { 168 return datastore.NewKey(c, "string", "rtt", 0, nil) 169} 170 171// cgMemcachePutKey generates a memcache key string for use in Put operations 172// when []ClientGroup is cached into memcache. 173func cgMemcachePutKey() string { 174 ns := time.Now().UnixNano() 175 return fmt.Sprintf("rtt:bqImport:Put:%d", ns) 176} 177