1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package ingestor 6 7import ( 8 "context" 9 "fmt" 10 11 "github.com/timescale/promscale/pkg/log" 12 "github.com/timescale/promscale/pkg/pgmodel/cache" 13 pgErrors "github.com/timescale/promscale/pkg/pgmodel/common/errors" 14 "github.com/timescale/promscale/pkg/pgmodel/common/schema" 15 "github.com/timescale/promscale/pkg/pgmodel/model" 16 "github.com/timescale/promscale/pkg/pgxconn" 17) 18 19const getCreateMetricsTableWithNewSQL = "SELECT table_name, possibly_new FROM " + schema.Catalog + ".get_or_create_metric_table_name($1)" 20const createExemplarTable = "SELECT * FROM " + schema.Catalog + ".create_exemplar_table_if_not_exists($1)" 21 22func containsExemplars(data []model.Insertable) bool { 23 for _, row := range data { 24 if row.IsOfType(model.Exemplar) { 25 return true 26 } 27 } 28 return false 29} 30 31type readRequest struct { 32 copySender <-chan copyRequest 33} 34 35func metricTableName(conn pgxconn.PgxConn, metric string) (string, bool, error) { 36 res, err := conn.Query( 37 context.Background(), 38 getCreateMetricsTableWithNewSQL, 39 metric, 40 ) 41 42 if err != nil { 43 return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err) 44 } 45 46 var tableName string 47 var possiblyNew bool 48 defer res.Close() 49 if !res.Next() { 50 if err := res.Err(); err != nil { 51 return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err) 52 } 53 return "", true, pgErrors.ErrMissingTableName 54 } 55 56 if err := res.Scan(&tableName, &possiblyNew); err != nil { 57 return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err) 58 } 59 60 if err := res.Err(); err != nil { 61 return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err) 62 } 63 64 if tableName == "" { 65 return "", true, fmt.Errorf("failed to get the table name for metric %s: empty table name returned", metric) 66 } 67 68 return tableName, possiblyNew, nil 69} 70 71// Create the metric table for the metric we handle, if it does not already 72// exist. This only does the most critical part of metric table creation, the 73// rest is handled by completeMetricTableCreation(). 74func initializeMetricBatcher(conn pgxconn.PgxConn, metricName string, completeMetricCreationSignal chan struct{}, metricTableNames cache.MetricCache) (tableName string, err error) { 75 // Metric batchers are always initialized with metric names of samples and not of exemplars. 76 mInfo, err := metricTableNames.Get(schema.Data, metricName, false) 77 if err == nil && mInfo.TableName != "" { 78 return mInfo.TableName, nil 79 } 80 81 tableName, possiblyNew, err := metricTableName(conn, metricName) 82 if err != nil || tableName == "" { 83 return "", err 84 } 85 86 // We ignore error here since this is just an optimization. 87 // 88 // Always set metric name while initializing with exemplars as false, since 89 // the metric name set here is via fetching the metric name from metric table. 90 // 91 // Metric table is filled during start, but exemplar table is filled when we 92 // first see an exemplar. Hence, that's the place to sent isExemplar as true. 93 _ = metricTableNames.Set( 94 schema.Data, 95 metricName, 96 model.MetricInfo{ 97 TableSchema: schema.Data, TableName: tableName, 98 SeriesTable: tableName, // Series table name is always the same for raw metrics. 99 }, 100 false, 101 ) 102 103 if possiblyNew { 104 //pass a signal if there is space 105 select { 106 case completeMetricCreationSignal <- struct{}{}: 107 default: 108 } 109 } 110 return tableName, err 111} 112 113// initilizeExemplars creates the necessary tables for exemplars. Called lazily only if exemplars are found 114func initializeExemplars(conn pgxconn.PgxConn, metricName string) error { 115 // We are seeing the exemplar belonging to this metric first time. It may be the 116 // first time of this exemplar in the database. So, let's attempt to create a table 117 // if it does not exists. 118 var created bool 119 err := conn.QueryRow(context.Background(), createExemplarTable, metricName).Scan(&created) 120 if err != nil { 121 return fmt.Errorf("error initializing exemplar tables for %s: %w", metricName, err) 122 } 123 return nil 124} 125 126func runMetricBatcher(conn pgxconn.PgxConn, 127 input chan *insertDataRequest, 128 metricName string, 129 completeMetricCreationSignal chan struct{}, 130 metricTableNames cache.MetricCache, 131 copierReadRequestCh chan<- readRequest, 132 labelArrayOID uint32) { 133 134 var ( 135 tableName string 136 firstReq *insertDataRequest 137 firstReqSet = false 138 exemplarsInitialized = false 139 ) 140 141 addReq := func(req *insertDataRequest, buf *pendingBuffer) { 142 if !exemplarsInitialized && containsExemplars(req.data) { 143 if err := initializeExemplars(conn, metricName); err != nil { 144 log.Error("msg", err) 145 req.reportResult(err) 146 return 147 } 148 exemplarsInitialized = true 149 } 150 buf.addReq(req) 151 } 152 //This channel in synchronous (no buffering). This provides backpressure 153 //to the batcher to keep batching until the copier is ready to read. 154 copySender := make(chan copyRequest) 155 defer close(copySender) 156 readRequest := readRequest{copySender: copySender} 157 158 for firstReq = range input { 159 var err error 160 tableName, err = initializeMetricBatcher(conn, metricName, completeMetricCreationSignal, metricTableNames) 161 if err != nil { 162 err := fmt.Errorf("initializing the insert routine for metric %v has failed with %w", metricName, err) 163 log.Error("msg", err) 164 firstReq.reportResult(err) 165 } else { 166 firstReqSet = true 167 break 168 } 169 } 170 171 //input channel was closed before getting a successful request 172 if !firstReqSet { 173 return 174 } 175 176 //the basic structure of communication from the batcher to the copier is as follows: 177 // 1. the batcher gets a request from the input channel 178 // 2. the batcher sends a readRequest to the copier on a channel shared by all the metric batchers 179 // 3. the batcher keeps on batching together requests from the input channel as long as the copier isn't ready to receive the batch 180 // 4. the copier catches up and gets the read request from the shared channel 181 // 5. the copier reads from the channel specified in the read request 182 // 6. the batcher is able to send it's batch to the copier 183 184 // Notice some properties: 185 // 1. The shared channel in step 2 acts as a queue between metric batchers where the priority is approximately the earliest arrival time of any 186 // request in the batch (that's why we only do step 2 after step 1). Note this means we probably want a single copier reading a batch 187 // of requests consecutively so as to minimize processing delays. That's what the mutex in the copier does. 188 // 2. There is an auto-adjusting adaptation loop in step 3. The longer the copier takes to catch up to the readRequest in the queue, the more things will be batched 189 // 3. The batcher has only a single read request out at a time. 190 191 pending := NewPendingBuffer() 192 addReq(firstReq, pending) 193 copierReadRequestCh <- readRequest 194 195 for { 196 if pending.IsEmpty() { 197 req, ok := <-input 198 if !ok { 199 return 200 } 201 addReq(req, pending) 202 copierReadRequestCh <- readRequest 203 } 204 205 recvCh := input 206 if pending.IsFull() { 207 recvCh = nil 208 } 209 210 numSeries := pending.batch.CountSeries() 211 select { 212 //try to send first, if not then keep batching 213 case copySender <- copyRequest{pending, tableName}: 214 MetricBatcherFlushSeries.Observe(float64(numSeries)) 215 pending = NewPendingBuffer() 216 case req, ok := <-recvCh: 217 if !ok { 218 if !pending.IsEmpty() { 219 copySender <- copyRequest{pending, tableName} 220 MetricBatcherFlushSeries.Observe(float64(numSeries)) 221 } 222 return 223 } 224 addReq(req, pending) 225 } 226 } 227} 228