diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
index d437294dfc3..e875de35a80 100644
--- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
+++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
@@ -52,6 +52,27 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+/**
+ * BatchRetrievalQueryRunnable is a Runnable for running a BigQuery Feast batch retrieval job async.
+ *
+ *
It does the following, in sequence:
+ *
+ *
1. Retrieve the temporal bounds of the entity dataset provided. This will be used to filter
+ * the feature set tables when performing the feature retrieval.
+ *
+ *
2. For each of the feature sets requested, generate the subquery for doing a point-in-time
+ * correctness join of the features in the feature set to the entity table.
+ *
+ *
3. Run each of the subqueries in parallel and wait for them to complete. If any of the jobs
+ * are unsuccessful, the thread running the BatchRetrievalQueryRunnable catches the error and
+ * updates the job database.
+ *
+ *
4. When all the subquery jobs are complete, join the outputs of all the subqueries into a
+ * single table.
+ *
+ *
5. Extract the output of the join to a remote file, and write the location of the remote file
+ * to the job database, and mark the retrieval job as successful.
+ */
@AutoValue
public abstract class BatchRetrievalQueryRunnable implements Runnable {
@@ -109,18 +130,22 @@ public abstract static class Builder {
@Override
public void run() {
+ // 1. Retrieve the temporal bounds of the entity dataset provided
FieldValueList timestampLimits = getTimestampLimits(entityTableName());
+ // 2. Generate the subqueries
List featureSetQueries = generateQueries(timestampLimits);
QueryJobConfiguration queryConfig;
try {
+ // 3 & 4. Run the subqueries in parallel then collect the outputs
Job queryJob = runBatchQuery(featureSetQueries);
queryConfig = queryJob.getConfiguration();
String exportTableDestinationUri =
String.format("%s/%s/*.avro", jobStagingLocation(), feastJobId());
+ // 5. Export the table
// Hardcode the format to Avro for now
ExtractJobConfiguration extractConfig =
ExtractJobConfiguration.of(
@@ -141,6 +166,7 @@ public void run() {
List fileUris = parseOutputFileURIs();
+ // 5. Update the job database
jobService()
.upsert(
ServingAPIProto.Job.newBuilder()
@@ -181,6 +207,8 @@ Job runBatchQuery(List featureSetQueries)
List featureSetInfos = new ArrayList<>();
+ // For each of the feature sets requested, start an async job joining the features in that
+ // feature set to the provided entity table
for (int i = 0; i < featureSetQueries.size(); i++) {
QueryJobConfiguration queryJobConfig =
QueryJobConfiguration.newBuilder(featureSetQueries.get(i))
@@ -197,6 +225,8 @@ Job runBatchQuery(List featureSetQueries)
for (int i = 0; i < featureSetQueries.size(); i++) {
try {
+ // Try to retrieve the outputs of all the jobs. The timeout here is a formality;
+ // a stricter timeout is implemented in the actual SubqueryCallable.
FeatureSetInfo featureSetInfo =
executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS);
featureSetInfos.add(featureSetInfo);
@@ -218,6 +248,8 @@ Job runBatchQuery(List featureSetQueries)
}
}
+ // Generate and run a join query to collect the outputs of all the
+ // subqueries into a single table.
String joinQuery =
QueryTemplater.createJoinQuery(
featureSetInfos, entityTableColumnNames(), entityTableName());
diff --git a/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java b/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java
index e0b8f457986..14026030b42 100644
--- a/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java
+++ b/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java
@@ -30,8 +30,8 @@
import java.util.concurrent.Callable;
/**
- * Waits for a bigquery job to complete; when complete, it updates the feature set info with the
- * output table name, as well as increments the completed jobs counter in the query job listener.
+ * Waits for a point-in-time correctness join to complete. On completion, returns a featureSetInfo
+ * updated with the reference to the table containing the results of the query.
*/
@AutoValue
public abstract class SubqueryCallable implements Callable {
diff --git a/serving/src/main/resources/templates/join_featuresets.sql b/serving/src/main/resources/templates/join_featuresets.sql
index e57b0c10314..60b7c7d7a12 100644
--- a/serving/src/main/resources/templates/join_featuresets.sql
+++ b/serving/src/main/resources/templates/join_featuresets.sql
@@ -1,3 +1,6 @@
+/*
+ Joins the outputs of multiple point-in-time-correctness joins to a single table.
+ */
WITH joined as (
SELECT * FROM `{{ leftTableName }}`
{% for featureSet in featureSets %}
diff --git a/serving/src/main/resources/templates/single_featureset_pit_join.sql b/serving/src/main/resources/templates/single_featureset_pit_join.sql
index f6678421851..1f4612b3503 100644
--- a/serving/src/main/resources/templates/single_featureset_pit_join.sql
+++ b/serving/src/main/resources/templates/single_featureset_pit_join.sql
@@ -1,9 +1,24 @@
-WITH union_features AS (SELECT
+/*
+ This query template performs the point-in-time correctness join for a single feature set table
+ to the provided entity table.
+
+ 1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
+ Feature values are joined to this table later for improved efficiency.
+ featureset_timestamp is equal to null in rows from the entity dataset.
+ */
+WITH union_features AS (
+SELECT
+ -- uuid is a unique identifier for each row in the entity dataset. Generated by `QueryTemplater.createEntityTableUUIDQuery`
uuid,
+ -- event_timestamp contains the timestamps to join onto
event_timestamp,
+ -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
NULL as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
+ -- created timestamp of the feature at the corresponding feature_timestamp
NULL as created_timestamp,
+ -- select only entities belonging to this feature set
{{ featureSet.entities | join(', ')}},
+ -- boolean for filtering the dataset later
true AS is_entity_table
FROM `{{leftTableName}}`
UNION ALL
@@ -15,7 +30,18 @@ SELECT
{{ featureSet.entities | join(', ')}},
false AS is_entity_table
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
-), joined AS (
+),
+/*
+ 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
+ well as is_entity_table.
+ Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
+ in the rows from the entity table should now contain the latest timestamps relative to the row's
+ event_timestamp.
+
+ For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
+ feature_timestamp to null.
+ */
+joined AS (
SELECT
uuid,
event_timestamp,
@@ -34,6 +60,10 @@ SELECT
FROM union_features
WINDOW w AS (PARTITION BY {{ featureSet.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
)
+/*
+ 3. Select only the rows from the entity table, and join the features from the original feature set table
+ to the dataset using the entity values, feature_timestamp, and created_timestamps.
+ */
LEFT JOIN (
SELECT
event_timestamp as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
@@ -46,6 +76,9 @@ FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}
) USING ({{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, created_timestamp, {{ featureSet.entities | join(', ')}})
WHERE is_entity_table
)
+/*
+ 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row UUID.
+ */
SELECT
k.*
FROM (