1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/sqlite/span_join_operator_table.h"
18 
19 #include <sqlite3.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <set>
24 #include <utility>
25 
26 #include "perfetto/base/logging.h"
27 #include "perfetto/ext/base/string_splitter.h"
28 #include "perfetto/ext/base/string_utils.h"
29 #include "perfetto/ext/base/string_view.h"
30 #include "src/trace_processor/sqlite/sqlite_utils.h"
31 
32 namespace perfetto {
33 namespace trace_processor {
34 
35 namespace {
36 
37 constexpr char kTsColumnName[] = "ts";
38 constexpr char kDurColumnName[] = "dur";
39 
IsRequiredColumn(const std::string & name)40 bool IsRequiredColumn(const std::string& name) {
41   return name == kTsColumnName || name == kDurColumnName;
42 }
43 
HasDuplicateColumns(const std::vector<SqliteTable::Column> & cols)44 base::Optional<std::string> HasDuplicateColumns(
45     const std::vector<SqliteTable::Column>& cols) {
46   std::set<std::string> names;
47   for (const auto& col : cols) {
48     if (names.count(col.name()) > 0)
49       return col.name();
50     names.insert(col.name());
51   }
52   return base::nullopt;
53 }
54 
55 }  // namespace
56 
SpanJoinOperatorTable(sqlite3 * db,const TraceStorage *)57 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*)
58     : db_(db) {}
59 
RegisterTable(sqlite3 * db,const TraceStorage * storage)60 void SpanJoinOperatorTable::RegisterTable(sqlite3* db,
61                                           const TraceStorage* storage) {
62   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_join",
63                                                /* read_write */ false,
64                                                /* requires_args */ true);
65 
66   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_left_join",
67                                                /* read_write */ false,
68                                                /* requires_args */ true);
69 
70   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join",
71                                                /* read_write */ false,
72                                                /* requires_args */ true);
73 }
74 
Init(int argc,const char * const * argv,Schema * schema)75 util::Status SpanJoinOperatorTable::Init(int argc,
76                                          const char* const* argv,
77                                          Schema* schema) {
78   // argv[0] - argv[2] are SQLite populated fields which are always present.
79   if (argc < 5)
80     return util::Status("SPAN_JOIN: expected at least 2 args");
81 
82   TableDescriptor t1_desc;
83   auto status = TableDescriptor::Parse(
84       std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
85   if (!status.ok())
86     return status;
87 
88   TableDescriptor t2_desc;
89   status = TableDescriptor::Parse(
90       std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
91   if (!status.ok())
92     return status;
93 
94   // Check that the partition columns match between the two tables.
95   if (t1_desc.partition_col == t2_desc.partition_col) {
96     partitioning_ = t1_desc.IsPartitioned()
97                         ? PartitioningType::kSamePartitioning
98                         : PartitioningType::kNoPartitioning;
99     if (partitioning_ == PartitioningType::kNoPartitioning && IsOuterJoin()) {
100       return util::ErrStatus(
101           "SPAN_JOIN: Outer join not supported for no partition tables");
102     }
103   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
104     return util::ErrStatus(
105         "SPAN_JOIN: mismatching partitions between the two tables; "
106         "(partition %s in table %s, partition %s in table %s)",
107         t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
108         t2_desc.partition_col.c_str(), t2_desc.name.c_str());
109   } else {
110     if (IsOuterJoin()) {
111       return util::ErrStatus(
112           "SPAN_JOIN: Outer join not supported for mixed partitioned tables");
113     }
114     partitioning_ = PartitioningType::kMixedPartitioning;
115   }
116 
117   bool t1_part_mixed = t1_desc.IsPartitioned() &&
118                        partitioning_ == PartitioningType::kMixedPartitioning;
119   bool t2_part_mixed = t2_desc.IsPartitioned() &&
120                        partitioning_ == PartitioningType::kMixedPartitioning;
121 
122   EmitShadowType t1_shadow_type;
123   if (IsOuterJoin()) {
124     if (t1_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
125       t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
126     } else {
127       t1_shadow_type = EmitShadowType::kAll;
128     }
129   } else {
130     t1_shadow_type = EmitShadowType::kNone;
131   }
132   status = CreateTableDefinition(t1_desc, t1_shadow_type, &t1_defn_);
133   if (!status.ok())
134     return status;
135 
136   EmitShadowType t2_shadow_type;
137   if (IsOuterJoin() || IsLeftJoin()) {
138     if (t2_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
139       t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
140     } else {
141       t2_shadow_type = EmitShadowType::kAll;
142     }
143   } else {
144     t2_shadow_type = EmitShadowType::kNone;
145   }
146   status = CreateTableDefinition(t2_desc, t2_shadow_type, &t2_defn_);
147   if (!status.ok())
148     return status;
149 
150   std::vector<SqliteTable::Column> cols;
151   // Ensure the shared columns are consistently ordered and are not
152   // present twice in the final schema
153   cols.emplace_back(Column::kTimestamp, kTsColumnName, SqlValue::Type::kLong);
154   cols.emplace_back(Column::kDuration, kDurColumnName, SqlValue::Type::kLong);
155   if (partitioning_ != PartitioningType::kNoPartitioning)
156     cols.emplace_back(Column::kPartition, partition_col(),
157                       SqlValue::Type::kLong);
158 
159   CreateSchemaColsForDefn(t1_defn_, &cols);
160   CreateSchemaColsForDefn(t2_defn_, &cols);
161 
162   if (auto opt_dupe_col = HasDuplicateColumns(cols)) {
163     return util::ErrStatus(
164         "SPAN_JOIN: column %s present in both tables %s and %s",
165         opt_dupe_col->c_str(), t1_defn_.name().c_str(),
166         t2_defn_.name().c_str());
167   }
168   std::vector<size_t> primary_keys = {Column::kTimestamp};
169   if (partitioning_ != PartitioningType::kNoPartitioning)
170     primary_keys.push_back(Column::kPartition);
171   *schema = Schema(cols, primary_keys);
172 
173   return util::OkStatus();
174 }
175 
CreateSchemaColsForDefn(const TableDefinition & defn,std::vector<SqliteTable::Column> * cols)176 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
177     const TableDefinition& defn,
178     std::vector<SqliteTable::Column>* cols) {
179   for (size_t i = 0; i < defn.columns().size(); i++) {
180     const auto& n = defn.columns()[i].name();
181     if (IsRequiredColumn(n) || n == defn.partition_col())
182       continue;
183 
184     ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
185     locator->defn = &defn;
186     locator->col_index = i;
187 
188     cols->emplace_back(cols->size(), n, defn.columns()[i].type());
189   }
190 }
191 
CreateCursor()192 std::unique_ptr<SqliteTable::Cursor> SpanJoinOperatorTable::CreateCursor() {
193   return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
194 }
195 
BestIndex(const QueryConstraints & qc,BestIndexInfo * info)196 int SpanJoinOperatorTable::BestIndex(const QueryConstraints& qc,
197                                      BestIndexInfo* info) {
198   // TODO(lalitm): figure out cost estimation.
199   const auto& ob = qc.order_by();
200 
201   if (partitioning_ == PartitioningType::kNoPartitioning) {
202     // If both tables are not partitioned and we have a single order by on ts,
203     // we return data in the correct order.
204     info->sqlite_omit_order_by =
205         ob.size() == 1 && ob[0].iColumn == Column::kTimestamp && !ob[0].desc;
206   } else {
207     // If one of the tables is partitioned, and we have an order by on the
208     // partition column followed (optionally) by an order by on timestamp, we
209     // return data in the correct order.
210     bool is_first_ob_partition =
211         ob.size() >= 1 && ob[0].iColumn == Column::kPartition && !ob[0].desc;
212     bool is_second_ob_ts =
213         ob.size() >= 2 && ob[1].iColumn == Column::kTimestamp && !ob[1].desc;
214     info->sqlite_omit_order_by =
215         (ob.size() == 1 && is_first_ob_partition) ||
216         (ob.size() == 2 && is_first_ob_partition && is_second_ob_ts);
217   }
218   return SQLITE_OK;
219 }
220 
221 std::vector<std::string>
ComputeSqlConstraintsForDefinition(const TableDefinition & defn,const QueryConstraints & qc,sqlite3_value ** argv)222 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
223     const TableDefinition& defn,
224     const QueryConstraints& qc,
225     sqlite3_value** argv) {
226   std::vector<std::string> constraints;
227   for (size_t i = 0; i < qc.constraints().size(); i++) {
228     const auto& cs = qc.constraints()[i];
229     auto col_name = GetNameForGlobalColumnIndex(defn, cs.column);
230     if (col_name == "")
231       continue;
232 
233     if (col_name == kTsColumnName || col_name == kDurColumnName) {
234       // Allow SQLite handle any constraints on ts or duration.
235       continue;
236     }
237     auto op = sqlite_utils::OpToString(cs.op);
238     auto value = sqlite_utils::SqliteValueAsString(argv[i]);
239 
240     constraints.emplace_back("`" + col_name + "`" + op + value);
241   }
242   return constraints;
243 }
244 
CreateTableDefinition(const TableDescriptor & desc,EmitShadowType emit_shadow_type,SpanJoinOperatorTable::TableDefinition * defn)245 util::Status SpanJoinOperatorTable::CreateTableDefinition(
246     const TableDescriptor& desc,
247     EmitShadowType emit_shadow_type,
248     SpanJoinOperatorTable::TableDefinition* defn) {
249   if (desc.partition_col == kTsColumnName ||
250       desc.partition_col == kDurColumnName) {
251     return util::ErrStatus(
252         "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
253         desc.name.c_str());
254   }
255 
256   auto cols = sqlite_utils::GetColumnsForTable(db_, desc.name);
257 
258   uint32_t required_columns_found = 0;
259   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
260   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
261   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
262   for (uint32_t i = 0; i < cols.size(); i++) {
263     auto col = cols[i];
264     if (IsRequiredColumn(col.name())) {
265       ++required_columns_found;
266       if (col.type() != SqlValue::Type::kLong &&
267           col.type() != SqlValue::Type::kNull) {
268         return util::ErrStatus(
269             "SPAN_JOIN: Invalid type for column %s in table %s",
270             col.name().c_str(), desc.name.c_str());
271       }
272     }
273 
274     if (col.name() == kTsColumnName) {
275       ts_idx = i;
276     } else if (col.name() == kDurColumnName) {
277       dur_idx = i;
278     } else if (col.name() == desc.partition_col) {
279       partition_idx = i;
280     }
281   }
282   if (required_columns_found != 2) {
283     return util::ErrStatus(
284         "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
285         desc.name.c_str());
286   } else if (desc.IsPartitioned() && partition_idx >= cols.size()) {
287     return util::ErrStatus("SPAN_JOIN: Missing partition column %s in table %s",
288                            desc.partition_col.c_str(), desc.name.c_str());
289   }
290 
291   PERFETTO_DCHECK(ts_idx < cols.size());
292   PERFETTO_DCHECK(dur_idx < cols.size());
293 
294   *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
295                           emit_shadow_type, ts_idx, dur_idx, partition_idx);
296   return util::OkStatus();
297 }
298 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)299 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
300     const TableDefinition& defn,
301     int global_column) {
302   size_t col_idx = static_cast<size_t>(global_column);
303   if (col_idx == Column::kTimestamp)
304     return kTsColumnName;
305   else if (col_idx == Column::kDuration)
306     return kDurColumnName;
307   else if (col_idx == Column::kPartition &&
308            partitioning_ != PartitioningType::kNoPartitioning)
309     return defn.partition_col().c_str();
310 
311   const auto& locator = global_index_to_column_locator_[col_idx];
312   if (locator.defn != &defn)
313     return "";
314   return defn.columns()[locator.col_index].name().c_str();
315 }
316 
Cursor(SpanJoinOperatorTable * table,sqlite3 * db)317 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
318     : SqliteTable::Cursor(table),
319       t1_(table, &table->t1_defn_, db),
320       t2_(table, &table->t2_defn_, db),
321       table_(table) {}
322 
Filter(const QueryConstraints & qc,sqlite3_value ** argv,FilterHistory)323 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
324                                           sqlite3_value** argv,
325                                           FilterHistory) {
326   util::Status status = t1_.Initialize(qc, argv);
327   if (!status.ok())
328     return SQLITE_ERROR;
329 
330   status = t2_.Initialize(qc, argv);
331   if (!status.ok())
332     return SQLITE_ERROR;
333 
334   status = FindOverlappingSpan();
335   return status.ok() ? SQLITE_OK : SQLITE_ERROR;
336 }
337 
Next()338 int SpanJoinOperatorTable::Cursor::Next() {
339   util::Status status = next_query_->Next();
340   if (!status.ok())
341     return SQLITE_ERROR;
342 
343   status = FindOverlappingSpan();
344   return status.ok() ? SQLITE_OK : SQLITE_ERROR;
345 }
346 
IsOverlappingSpan()347 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
348   // If either of the tables are eof, then we cannot possibly have an
349   // overlapping span.
350   if (t1_.IsEof() || t2_.IsEof())
351     return false;
352 
353   // One of the tables always needs to have a real span to have a valid
354   // overlapping span.
355   if (!t1_.IsReal() && !t2_.IsReal())
356     return false;
357 
358   if (table_->partitioning_ == PartitioningType::kSamePartitioning) {
359     // If both tables are partitioned, then ensure that the partitions overlap.
360     bool partition_in_bounds = (t1_.FirstPartition() >= t2_.FirstPartition() &&
361                                 t1_.FirstPartition() <= t2_.LastPartition()) ||
362                                (t2_.FirstPartition() >= t1_.FirstPartition() &&
363                                 t2_.FirstPartition() <= t1_.LastPartition());
364     if (!partition_in_bounds)
365       return false;
366   }
367 
368   // We consider all slices to be [start, end) - that is the range of
369   // timestamps has an open interval at the start but a closed interval
370   // at the end. (with the exception of dur == -1 which we treat as if
371   // end == start for the purpose of this function).
372   return (t1_.ts() == t2_.ts() && t1_.IsReal() && t2_.IsReal()) ||
373          (t1_.ts() >= t2_.ts() && t1_.ts() < t2_.AdjustedTsEnd()) ||
374          (t2_.ts() >= t1_.ts() && t2_.ts() < t1_.AdjustedTsEnd());
375 }
376 
FindOverlappingSpan()377 util::Status SpanJoinOperatorTable::Cursor::FindOverlappingSpan() {
378   // We loop until we find a slice which overlaps from the two tables.
379   while (true) {
380     if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
381       // If we have a mixed partition setup, we need to have special checks
382       // for eof and to reset the unpartitioned cursor every time the partition
383       // changes in the partitioned table.
384       auto* partitioned = t1_.definition()->IsPartitioned() ? &t1_ : &t2_;
385       auto* unpartitioned = t1_.definition()->IsPartitioned() ? &t2_ : &t1_;
386 
387       // If the partitioned table reaches eof, then we are really done.
388       if (partitioned->IsEof())
389         break;
390 
391       // If the partition has changed from the previous one, reset the cursor
392       // and keep a lot of the new partition.
393       if (last_mixed_partition_ != partitioned->partition()) {
394         util::Status status = unpartitioned->Rewind();
395         if (!status.ok())
396           return status;
397         last_mixed_partition_ = partitioned->partition();
398       }
399     } else if (t1_.IsEof() || t2_.IsEof()) {
400       // For both no partition and same partition cases, either cursor ending
401       // ends the whole span join.
402       break;
403     }
404 
405     // Find which slice finishes first.
406     next_query_ = FindEarliestFinishQuery();
407 
408     // If the current span is overlapping, just finsh there to emit the current
409     // slice.
410     if (IsOverlappingSpan())
411       break;
412 
413     // Otherwise, step to the next row.
414     util::Status status = next_query_->Next();
415     if (!status.ok())
416       return status;
417   }
418   return util::OkStatus();
419 }
420 
421 SpanJoinOperatorTable::Query*
FindEarliestFinishQuery()422 SpanJoinOperatorTable::Cursor::FindEarliestFinishQuery() {
423   int64_t t1_part;
424   int64_t t2_part;
425 
426   switch (table_->partitioning_) {
427     case PartitioningType::kMixedPartitioning: {
428       // If either table is EOF, forward the other table to try and make
429       // the partitions not match anymore.
430       if (t1_.IsEof())
431         return &t2_;
432       if (t2_.IsEof())
433         return &t1_;
434 
435       // Otherwise, just make the partition equal from both tables.
436       t1_part = last_mixed_partition_;
437       t2_part = last_mixed_partition_;
438       break;
439     }
440     case PartitioningType::kSamePartitioning: {
441       // Get the partition values from the cursor.
442       t1_part = t1_.LastPartition();
443       t2_part = t2_.LastPartition();
444       break;
445     }
446     case PartitioningType::kNoPartitioning: {
447       t1_part = 0;
448       t2_part = 0;
449       break;
450     }
451   }
452 
453   // Prefer to forward the earliest cursors based on the following
454   // lexiographical ordering:
455   // 1. partition
456   // 2. end timestamp
457   // 3. whether the slice is real or shadow (shadow < real)
458   bool t1_less = std::make_tuple(t1_part, t1_.AdjustedTsEnd(), t1_.IsReal()) <
459                  std::make_tuple(t2_part, t2_.AdjustedTsEnd(), t2_.IsReal());
460   return t1_less ? &t1_ : &t2_;
461 }
462 
Eof()463 int SpanJoinOperatorTable::Cursor::Eof() {
464   return t1_.IsEof() || t2_.IsEof();
465 }
466 
Column(sqlite3_context * context,int N)467 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
468   PERFETTO_DCHECK(t1_.IsReal() || t2_.IsReal());
469 
470   switch (N) {
471     case Column::kTimestamp: {
472       auto max_ts = std::max(t1_.ts(), t2_.ts());
473       sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
474       break;
475     }
476     case Column::kDuration: {
477       auto max_start = std::max(t1_.ts(), t2_.ts());
478       auto min_end = std::min(t1_.raw_ts_end(), t2_.raw_ts_end());
479       auto dur = min_end - max_start;
480       sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
481       break;
482     }
483     case Column::kPartition: {
484       if (table_->partitioning_ != PartitioningType::kNoPartitioning) {
485         int64_t partition;
486         if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
487           partition = last_mixed_partition_;
488         } else {
489           partition = t1_.IsReal() ? t1_.partition() : t2_.partition();
490         }
491         sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
492         break;
493       }
494       [[clang::fallthrough]];
495     }
496     default: {
497       size_t index = static_cast<size_t>(N);
498       const auto& locator = table_->global_index_to_column_locator_[index];
499       if (locator.defn == t1_.definition())
500         t1_.ReportSqliteResult(context, locator.col_index);
501       else
502         t2_.ReportSqliteResult(context, locator.col_index);
503     }
504   }
505   return SQLITE_OK;
506 }
507 
Query(SpanJoinOperatorTable * table,const TableDefinition * definition,sqlite3 * db)508 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
509                                     const TableDefinition* definition,
510                                     sqlite3* db)
511     : defn_(definition), db_(db), table_(table) {
512   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
513                   defn_->partition_idx() < defn_->columns().size());
514 }
515 
516 SpanJoinOperatorTable::Query::~Query() = default;
517 
Initialize(const QueryConstraints & qc,sqlite3_value ** argv)518 util::Status SpanJoinOperatorTable::Query::Initialize(
519     const QueryConstraints& qc,
520     sqlite3_value** argv) {
521   *this = Query(table_, definition(), db_);
522   sql_query_ = CreateSqlQuery(
523       table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
524   return Rewind();
525 }
526 
Next()527 util::Status SpanJoinOperatorTable::Query::Next() {
528   util::Status status = NextSliceState();
529   if (!status.ok())
530     return status;
531   return FindNextValidSlice();
532 }
533 
IsValidSlice()534 bool SpanJoinOperatorTable::Query::IsValidSlice() {
535   // Disallow any single partition shadow slices if the definition doesn't allow
536   // them.
537   if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
538     return false;
539 
540   // Disallow any missing partition shadow slices if the definition doesn't
541   // allow them.
542   if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
543     return false;
544 
545   // Disallow any "empty" shadows; these are shadows which either have the same
546   // start and end time or missing-partition shadows which have the same start
547   // and end partition.
548   if (IsEmptyShadow())
549     return false;
550 
551   return true;
552 }
553 
FindNextValidSlice()554 util::Status SpanJoinOperatorTable::Query::FindNextValidSlice() {
555   // The basic idea of this function is that |NextSliceState()| always emits
556   // all possible slices (including shadows for any gaps inbetween the real
557   // slices) and we filter out the invalid slices (as defined by the table
558   // definition) using |IsValidSlice()|.
559   //
560   // This has proved to be a lot cleaner to implement than trying to choose
561   // when to emit and not emit shadows directly.
562   while (!IsEof() && !IsValidSlice()) {
563     util::Status status = NextSliceState();
564     if (!status.ok())
565       return status;
566   }
567   return util::OkStatus();
568 }
569 
NextSliceState()570 util::Status SpanJoinOperatorTable::Query::NextSliceState() {
571   switch (state_) {
572     case State::kReal: {
573       // Forward the cursor to figure out where the next slice should be.
574       util::Status status = CursorNext();
575       if (!status.ok())
576         return status;
577 
578       // Depending on the next slice, we can do two things here:
579       // 1. If the next slice is on the same partition, we can just emit a
580       //    single shadow until the start of the next slice.
581       // 2. If the next slice is on another partition or we hit eof, just emit
582       //    a shadow to the end of the whole partition.
583       bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
584                                            partition_ != CursorPartition());
585       state_ = State::kPresentPartitionShadow;
586       ts_ = AdjustedTsEnd();
587       ts_end_ =
588           shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
589       return util::OkStatus();
590     }
591     case State::kPresentPartitionShadow: {
592       if (ts_end_ == std::numeric_limits<int64_t>::max()) {
593         // If the shadow is to the end of the slice, create a missing partition
594         // shadow to the start of the partition of the next slice or to the max
595         // partition if we hit eof.
596         state_ = State::kMissingPartitionShadow;
597         ts_ = 0;
598         ts_end_ = std::numeric_limits<int64_t>::max();
599 
600         missing_partition_start_ = partition_ + 1;
601         missing_partition_end_ = cursor_eof_
602                                      ? std::numeric_limits<int64_t>::max()
603                                      : CursorPartition();
604       } else {
605         // If the shadow is not to the end, we must have another slice on the
606         // current partition.
607         state_ = State::kReal;
608         ts_ = CursorTs();
609         ts_end_ = ts_ + CursorDur();
610 
611         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
612                         partition_ == CursorPartition());
613       }
614       return util::OkStatus();
615     }
616     case State::kMissingPartitionShadow: {
617       if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
618         PERFETTO_DCHECK(cursor_eof_);
619 
620         // If we have a missing partition to the max partition, we must have hit
621         // eof.
622         state_ = State::kEof;
623       } else {
624         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
625                         CursorPartition() == missing_partition_end_);
626 
627         // Otherwise, setup a single partition slice on the end partition to the
628         // start of the next slice.
629         state_ = State::kPresentPartitionShadow;
630         ts_ = 0;
631         ts_end_ = CursorTs();
632         partition_ = missing_partition_end_;
633       }
634       return util::OkStatus();
635     }
636     case State::kEof: {
637       PERFETTO_DFATAL("Called Next when EOF");
638       return util::ErrStatus("Called Next when EOF");
639     }
640   }
641   PERFETTO_FATAL("For GCC");
642 }
643 
Rewind()644 util::Status SpanJoinOperatorTable::Query::Rewind() {
645   sqlite3_stmt* stmt = nullptr;
646   int res =
647       sqlite3_prepare_v2(db_, sql_query_.c_str(),
648                          static_cast<int>(sql_query_.size()), &stmt, nullptr);
649   stmt_.reset(stmt);
650 
651   cursor_eof_ = res != SQLITE_OK;
652   if (res != SQLITE_OK)
653     return util::ErrStatus("%s", sqlite3_errmsg(db_));
654 
655   util::Status status = CursorNext();
656   if (!status.ok())
657     return status;
658 
659   // Setup the first slice as a missing partition shadow from the lowest
660   // partition until the first slice partition. We will handle finding the real
661   // slice in |FindNextValidSlice()|.
662   state_ = State::kMissingPartitionShadow;
663   ts_ = 0;
664   ts_end_ = std::numeric_limits<int64_t>::max();
665   missing_partition_start_ = std::numeric_limits<int64_t>::min();
666 
667   if (cursor_eof_) {
668     missing_partition_end_ = std::numeric_limits<int64_t>::max();
669   } else if (defn_->IsPartitioned()) {
670     missing_partition_end_ = CursorPartition();
671   } else {
672     missing_partition_end_ = std::numeric_limits<int64_t>::min();
673   }
674 
675   // Actually compute the first valid slice.
676   return FindNextValidSlice();
677 }
678 
CursorNext()679 util::Status SpanJoinOperatorTable::Query::CursorNext() {
680   auto* stmt = stmt_.get();
681   int res;
682   if (defn_->IsPartitioned()) {
683     auto partition_idx = static_cast<int>(defn_->partition_idx());
684     // Fastforward through any rows with null partition keys.
685     int row_type;
686     do {
687       res = sqlite3_step(stmt);
688       row_type = sqlite3_column_type(stmt, partition_idx);
689     } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
690   } else {
691     res = sqlite3_step(stmt);
692   }
693   cursor_eof_ = res != SQLITE_ROW;
694   return res == SQLITE_ROW || res == SQLITE_DONE
695              ? util::OkStatus()
696              : util::ErrStatus("%s", sqlite3_errmsg(db_));
697 }
698 
CreateSqlQuery(const std::vector<std::string> & cs) const699 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
700     const std::vector<std::string>& cs) const {
701   std::vector<std::string> col_names;
702   for (const SqliteTable::Column& c : defn_->columns()) {
703     col_names.push_back("`" + c.name() + "`");
704   }
705 
706   std::string sql = "SELECT " + base::Join(col_names, ", ");
707   sql += " FROM " + defn_->name();
708   if (!cs.empty()) {
709     sql += " WHERE " + base::Join(cs, " AND ");
710   }
711   sql += " ORDER BY ";
712   sql += defn_->IsPartitioned()
713              ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
714              : "ts";
715   sql += ";";
716   PERFETTO_DLOG("%s", sql.c_str());
717   return sql;
718 }
719 
ReportSqliteResult(sqlite3_context * context,size_t index)720 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
721                                                       size_t index) {
722   if (state_ != State::kReal) {
723     sqlite3_result_null(context);
724     return;
725   }
726 
727   sqlite3_stmt* stmt = stmt_.get();
728   int idx = static_cast<int>(index);
729   switch (sqlite3_column_type(stmt, idx)) {
730     case SQLITE_INTEGER:
731       sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
732       break;
733     case SQLITE_FLOAT:
734       sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
735       break;
736     case SQLITE_TEXT: {
737       // TODO(lalitm): note for future optimizations: if we knew the addresses
738       // of the string intern pool, we could check if the string returned here
739       // comes from the pool, and pass it as non-transient.
740       const auto kSqliteTransient =
741           reinterpret_cast<sqlite3_destructor_type>(-1);
742       auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
743       sqlite3_result_text(context, ptr, -1, kSqliteTransient);
744       break;
745     }
746   }
747 }
748 
TableDefinition(std::string name,std::string partition_col,std::vector<SqliteTable::Column> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)749 SpanJoinOperatorTable::TableDefinition::TableDefinition(
750     std::string name,
751     std::string partition_col,
752     std::vector<SqliteTable::Column> cols,
753     EmitShadowType emit_shadow_type,
754     uint32_t ts_idx,
755     uint32_t dur_idx,
756     uint32_t partition_idx)
757     : emit_shadow_type_(emit_shadow_type),
758       name_(std::move(name)),
759       partition_col_(std::move(partition_col)),
760       cols_(std::move(cols)),
761       ts_idx_(ts_idx),
762       dur_idx_(dur_idx),
763       partition_idx_(partition_idx) {}
764 
Parse(const std::string & raw_descriptor,SpanJoinOperatorTable::TableDescriptor * descriptor)765 util::Status SpanJoinOperatorTable::TableDescriptor::Parse(
766     const std::string& raw_descriptor,
767     SpanJoinOperatorTable::TableDescriptor* descriptor) {
768   // Descriptors have one of the following forms:
769   // table_name [PARTITIONED column_name]
770 
771   // Find the table name.
772   base::StringSplitter splitter(raw_descriptor, ' ');
773   if (!splitter.Next())
774     return util::ErrStatus("SPAN_JOIN: Missing table name");
775 
776   descriptor->name = splitter.cur_token();
777   if (!splitter.Next())
778     return util::OkStatus();
779 
780   if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
781     return util::ErrStatus("SPAN_JOIN: Invalid token");
782 
783   if (!splitter.Next())
784     return util::ErrStatus("SPAN_JOIN: Missing partitioning column");
785 
786   descriptor->partition_col = splitter.cur_token();
787   return util::OkStatus();
788 }
789 
790 }  // namespace trace_processor
791 }  // namespace perfetto
792