1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package ingestor 6 7import ( 8 "context" 9 "fmt" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/timescale/promscale/pkg/log" 15 "github.com/timescale/promscale/pkg/pgmodel/cache" 16 "github.com/timescale/promscale/pkg/pgmodel/common/schema" 17 "github.com/timescale/promscale/pkg/pgmodel/metrics" 18 "github.com/timescale/promscale/pkg/pgmodel/model" 19 "github.com/timescale/promscale/pkg/pgxconn" 20 tput "github.com/timescale/promscale/pkg/util/throughput" 21) 22 23const ( 24 MetricBatcherChannelCap = 1000 25 finalizeMetricCreation = "CALL " + schema.Catalog + ".finalize_metric_creation()" 26 getEpochSQL = "SELECT current_epoch FROM " + schema.Catalog + ".ids_epoch LIMIT 1" 27) 28 29// pgxDispatcher redirects incoming samples to the appropriate metricBatcher 30// corresponding to the metric in the sample. 31type pgxDispatcher struct { 32 conn pgxconn.PgxConn 33 metricTableNames cache.MetricCache 34 scache cache.SeriesCache 35 exemplarKeyPosCache cache.PositionCache 36 batchers sync.Map 37 completeMetricCreation chan struct{} 38 asyncAcks bool 39 copierReadRequestCh chan<- readRequest 40 seriesEpochRefresh *time.Ticker 41 doneChannel chan struct{} 42 doneWG sync.WaitGroup 43 labelArrayOID uint32 44} 45 46func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) { 47 numCopiers := cfg.NumCopiers 48 if numCopiers < 1 { 49 log.Warn("msg", "num copiers less than 1, setting to 1") 50 numCopiers = 1 51 } 52 53 //the copier read request channel keep the queue order 54 //between metrucs 55 maxMetrics := 10000 56 copierReadRequestCh := make(chan readRequest, maxMetrics) 57 setCopierChannelToMonitor(copierReadRequestCh) 58 59 if cfg.IgnoreCompressedChunks { 60 // Handle decompression to not decompress anything. 61 handleDecompression = skipDecompression 62 } 63 64 if err := model.RegisterCustomPgTypes(conn); err != nil { 65 return nil, fmt.Errorf("registering custom pg types: %w", err) 66 } 67 68 labelArrayOID := model.GetCustomTypeOID(model.LabelArray) 69 sw := NewSeriesWriter(conn, labelArrayOID) 70 elf := NewExamplarLabelFormatter(conn, eCache) 71 72 for i := 0; i < numCopiers; i++ { 73 go runCopier(conn, copierReadRequestCh, sw, elf) 74 } 75 76 inserter := &pgxDispatcher{ 77 conn: conn, 78 metricTableNames: cache, 79 scache: scache, 80 exemplarKeyPosCache: eCache, 81 completeMetricCreation: make(chan struct{}, 1), 82 asyncAcks: cfg.AsyncAcks, 83 copierReadRequestCh: copierReadRequestCh, 84 // set to run at half our deletion interval 85 seriesEpochRefresh: time.NewTicker(30 * time.Minute), 86 doneChannel: make(chan struct{}), 87 } 88 runBatchWatcher(inserter.doneChannel) 89 90 //on startup run a completeMetricCreation to recover any potentially 91 //incomplete metric 92 if err := inserter.CompleteMetricCreation(); err != nil { 93 return nil, err 94 } 95 96 go inserter.runCompleteMetricCreationWorker() 97 98 if !cfg.DisableEpochSync { 99 inserter.doneWG.Add(1) 100 go func() { 101 defer inserter.doneWG.Done() 102 inserter.runSeriesEpochSync() 103 }() 104 } 105 return inserter, nil 106} 107 108func (p *pgxDispatcher) runCompleteMetricCreationWorker() { 109 for range p.completeMetricCreation { 110 err := p.CompleteMetricCreation() 111 if err != nil { 112 log.Warn("msg", "Got an error finalizing metric", "err", err) 113 } 114 } 115} 116 117func (p *pgxDispatcher) runSeriesEpochSync() { 118 epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch) 119 // we don't have any great place to report errors, and if the 120 // connection recovers we can still make progress, so we'll just log it 121 // and continue execution 122 if err != nil { 123 log.Error("msg", "error refreshing the series cache", "err", err) 124 } 125 for { 126 select { 127 case <-p.seriesEpochRefresh.C: 128 epoch, err = p.refreshSeriesEpoch(epoch) 129 if err != nil { 130 log.Error("msg", "error refreshing the series cache", "err", err) 131 } 132 case <-p.doneChannel: 133 return 134 } 135 } 136} 137 138func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) { 139 dbEpoch, err := p.getServerEpoch() 140 if err != nil { 141 // Trash the cache just in case an epoch change occurred, seems safer 142 p.scache.Reset() 143 return model.InvalidSeriesEpoch, err 144 } 145 if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch { 146 p.scache.Reset() 147 } 148 return dbEpoch, nil 149} 150 151func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) { 152 var newEpoch int64 153 row := p.conn.QueryRow(context.Background(), getEpochSQL) 154 err := row.Scan(&newEpoch) 155 if err != nil { 156 return -1, err 157 } 158 159 return model.SeriesEpoch(newEpoch), nil 160} 161 162func (p *pgxDispatcher) CompleteMetricCreation() error { 163 _, err := p.conn.Exec( 164 context.Background(), 165 finalizeMetricCreation, 166 ) 167 return err 168} 169 170func (p *pgxDispatcher) Close() { 171 close(p.completeMetricCreation) 172 p.batchers.Range(func(key, value interface{}) bool { 173 close(value.(chan *insertDataRequest)) 174 return true 175 }) 176 close(p.copierReadRequestCh) 177 close(p.doneChannel) 178 p.doneWG.Wait() 179} 180 181// InsertTs inserts a batch of data into the database. 182// The data should be grouped by metric name. 183// returns the number of rows we intended to insert (_not_ how many were 184// actually inserted) and any error. 185// Though we may insert data to multiple tables concurrently, if asyncAcks is 186// unset this function will wait until _all_ the insert attempts have completed. 187func (p *pgxDispatcher) InsertTs(dataTS model.Data) (uint64, error) { 188 var ( 189 numRows uint64 190 maxt int64 191 rows = dataTS.Rows 192 workFinished = new(sync.WaitGroup) 193 ) 194 workFinished.Add(len(rows)) 195 // we only allocate enough space for a single error message here as we only 196 // report one error back upstream. The inserter should not block on this 197 // channel, but only insert if it's empty, anything else can deadlock. 198 errChan := make(chan error, 1) 199 for metricName, data := range rows { 200 for _, insertable := range data { 201 numRows += uint64(insertable.Count()) 202 ts := insertable.MaxTs() 203 if maxt < ts { 204 maxt = ts 205 } 206 } 207 // the following is usually non-blocking, just a channel insert 208 p.getMetricBatcher(metricName) <- &insertDataRequest{metric: metricName, data: data, finished: workFinished, errChan: errChan} 209 } 210 reportIncomingBatch(numRows) 211 reportOutgoing := func() { 212 reportOutgoingBatch(numRows) 213 reportBatchProcessingTime(dataTS.ReceivedTime) 214 } 215 216 var err error 217 if !p.asyncAcks { 218 workFinished.Wait() 219 reportOutgoing() 220 select { 221 case err = <-errChan: 222 default: 223 } 224 postIngestTasks(maxt, numRows, 0) 225 close(errChan) 226 } else { 227 go func() { 228 workFinished.Wait() 229 reportOutgoing() 230 select { 231 case err = <-errChan: 232 default: 233 } 234 close(errChan) 235 if err != nil { 236 log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err) 237 } 238 postIngestTasks(maxt, numRows, 0) 239 }() 240 } 241 242 return numRows, err 243} 244 245func (p *pgxDispatcher) InsertMetadata(metadata []model.Metadata) (uint64, error) { 246 totalRows := uint64(len(metadata)) 247 insertedRows, err := insertMetadata(p.conn, metadata) 248 if err != nil { 249 return insertedRows, err 250 } 251 postIngestTasks(0, 0, insertedRows) 252 if totalRows != insertedRows { 253 return insertedRows, fmt.Errorf("failed to insert all metadata: inserted %d rows out of %d rows in total", insertedRows, totalRows) 254 } 255 return insertedRows, nil 256} 257 258// postIngestTasks performs a set of tasks that are due after ingesting series data. 259func postIngestTasks(maxTs int64, numSamples, numMetadata uint64) { 260 tput.ReportDataProcessed(maxTs, numSamples, numMetadata) 261 262 // Max_sent_timestamp stats. 263 if maxTs < atomic.LoadInt64(&MaxSentTimestamp) { 264 return 265 } 266 atomic.StoreInt64(&MaxSentTimestamp, maxTs) 267 metrics.MaxSentTimestamp.Set(float64(maxTs)) 268} 269 270// Get the handler for a given metric name, creating a new one if none exists 271func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataRequest { 272 batcher, ok := p.batchers.Load(metric) 273 if !ok { 274 // The ordering is important here: we need to ensure that every call 275 // to getMetricInserter() returns the same inserter. Therefore, we can 276 // only start up the inserter routine if we know that we won the race 277 // to create the inserter, anything else will leave a zombie inserter 278 // lying around. 279 c := make(chan *insertDataRequest, MetricBatcherChannelCap) 280 actual, old := p.batchers.LoadOrStore(metric, c) 281 batcher = actual 282 if !old { 283 go runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.copierReadRequestCh, p.labelArrayOID) 284 } 285 } 286 ch := batcher.(chan *insertDataRequest) 287 MetricBatcherChLen.Observe(float64(len(ch))) 288 return ch 289} 290 291type insertDataRequest struct { 292 metric string 293 finished *sync.WaitGroup 294 data []model.Insertable 295 errChan chan error 296} 297 298func (idr *insertDataRequest) reportResult(err error) { 299 if err != nil { 300 select { 301 case idr.errChan <- err: 302 default: 303 } 304 } 305 idr.finished.Done() 306} 307