From 9201f7d93271685c784bc5886c09ff411fe461b5 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Fri, 3 Apr 2020 20:41:21 +0800 Subject: [PATCH 1/3] Add better code documentation, make GetFeastServingInfo independent of retriever --- .../configuration/ServingServiceConfig.java | 3 +- .../service/HistoricalServingService.java | 24 +++---- .../serving/service/OnlineServingService.java | 15 ++++- .../feast/serving/service/ServingService.java | 63 +++++++++++++++++++ 4 files changed, 89 insertions(+), 16 deletions(-) diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index a149b3fa14c..5a901567126 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -117,7 +117,8 @@ public ServingService servingService( .setStorage(storage) .build(); - servingService = new HistoricalServingService(bqRetriever, specService, jobService); + servingService = + new HistoricalServingService(bqRetriever, specService, jobService, jobStagingLocation); break; case CASSANDRA: case UNRECOGNIZED: diff --git a/serving/src/main/java/feast/serving/service/HistoricalServingService.java b/serving/src/main/java/feast/serving/service/HistoricalServingService.java index 7d389a9e12d..c8353958822 100644 --- a/serving/src/main/java/feast/serving/service/HistoricalServingService.java +++ b/serving/src/main/java/feast/serving/service/HistoricalServingService.java @@ -23,7 +23,6 @@ import feast.storage.api.retrieval.FeatureSetRequest; import feast.storage.api.retrieval.HistoricalRetrievalResult; import feast.storage.api.retrieval.HistoricalRetriever; -import feast.storage.connectors.bigquery.retrieval.BigQueryHistoricalRetriever; import io.grpc.Status; import java.util.List; import java.util.Optional; @@ -38,30 +37,27 @@ public class HistoricalServingService implements ServingService { private final HistoricalRetriever retriever; private final CachedSpecService specService; private final JobService jobService; + private String stagingLocation; public HistoricalServingService( - HistoricalRetriever retriever, CachedSpecService specService, JobService jobService) { + HistoricalRetriever retriever, + CachedSpecService specService, + JobService jobService, + String stagingLocation) { this.retriever = retriever; this.specService = specService; this.jobService = jobService; + this.stagingLocation = stagingLocation; } /** {@inheritDoc} */ @Override public GetFeastServingInfoResponse getFeastServingInfo( GetFeastServingInfoRequest getFeastServingInfoRequest) { - try { - BigQueryHistoricalRetriever bigQueryHistoricalRetriever = - (BigQueryHistoricalRetriever) retriever; - return GetFeastServingInfoResponse.newBuilder() - .setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) - .setJobStagingLocation(bigQueryHistoricalRetriever.jobStagingLocation()) - .build(); - } catch (Exception e) { - return GetFeastServingInfoResponse.newBuilder() - .setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) - .build(); - } + return GetFeastServingInfoResponse.newBuilder() + .setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) + .setJobStagingLocation(stagingLocation) + .build(); } /** {@inheritDoc} */ diff --git a/serving/src/main/java/feast/serving/service/OnlineServingService.java b/serving/src/main/java/feast/serving/service/OnlineServingService.java index 4e0baed17b4..c6f2178a7f2 100644 --- a/serving/src/main/java/feast/serving/service/OnlineServingService.java +++ b/serving/src/main/java/feast/serving/service/OnlineServingService.java @@ -74,21 +74,32 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ Map> featureValuesMap = entityRows.stream() .collect(Collectors.toMap(row -> row, row -> Maps.newHashMap(row.getFieldsMap()))); - + // Get all feature rows from the retriever. Each feature row list corresponds to a single + // feature set request. List> featureRows = retriever.getOnlineFeatures(entityRows, featureSetRequests); + // For each feature set request, read the feature rows returned by the retriever, and + // populate the featureValuesMap with the feature values corresponding to that entity row. for (var fsIdx = 0; fsIdx < featureRows.size(); fsIdx++) { List featureRowsForFs = featureRows.get(fsIdx); FeatureSetRequest featureSetRequest = featureSetRequests.get(fsIdx); + + // In order to return values containing the same feature references provided by the user, + // we reuse the feature references in the request as the keys in the featureValuesMap Map featureNames = featureSetRequest.getFeatureReferences().stream() .collect( Collectors.toMap( FeatureReference::getName, featureReference -> featureReference)); + + // Each feature row returned (per feature set request) corresponds to a given entity row. + // For each feature row, update the featureValuesMap. for (var entityRowIdx = 0; entityRowIdx < entityRows.size(); entityRowIdx++) { FeatureRow featureRow = featureRowsForFs.get(entityRowIdx); EntityRow entityRow = entityRows.get(entityRowIdx); + + // If the row is stale, put an empty value into the featureValuesMap. if (isStale(featureSetRequest, entityRow, featureRow)) { featureSetRequest .getFeatureReferences() @@ -117,6 +128,8 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ String.format("%s:%d", ref.getName(), ref.getVersion())) .inc()); + // Else populate the featureValueMap at this entityRow with the values in the feature + // row. featureRow.getFieldsList().stream() .filter(field -> featureNames.containsKey(field.getName())) .forEach( diff --git a/serving/src/main/java/feast/serving/service/ServingService.java b/serving/src/main/java/feast/serving/service/ServingService.java index 83adcb73ba8..5e662229eeb 100644 --- a/serving/src/main/java/feast/serving/service/ServingService.java +++ b/serving/src/main/java/feast/serving/service/ServingService.java @@ -26,12 +26,75 @@ import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; public interface ServingService { + /** + * Get information about the Feast serving deployment. + * + *

For Bigquery deployments, this includes the default job staging location to load + * intermediate files to. Otherwise, this method only returns the current Feast Serving backing + * store type. + * + * @param getFeastServingInfoRequest {@link GetFeastServingInfoRequest} + * @return {@link GetFeastServingInfoResponse} + */ GetFeastServingInfoResponse getFeastServingInfo( GetFeastServingInfoRequest getFeastServingInfoRequest); + /** + * Get features from an online serving store, given a list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and list of {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow}s to join the retrieved values + * to. + * + *

Features can be queried across feature sets, but each {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow} must contain all entities for + * all feature sets included in the request. + * + *

This request is fulfilled synchronously. + * + * @param getFeaturesRequest {@link GetOnlineFeaturesRequest} containing list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve and list of {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow}s to join the retrieved + * values to. + * @return {@link GetOnlineFeaturesResponse} with list of {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues} for each {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow} supplied. + */ GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest getFeaturesRequest); + /** + * Get features from a batch serving store, given a list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and {@link + * feast.serving.ServingAPIProto.DatasetSource} pointing to remote location of dataset to join + * retrieved features to. All columns in the provided dataset will be preserved in the output + * dataset. + * + *

Due to the potential size of batch retrieval requests, this request is fulfilled + * asynchronously, and returns a retrieval job id, which when supplied to {@link + * #getJob(GetJobRequest)} will return the status of the retrieval job. + * + * @param getFeaturesRequest {@link GetBatchFeaturesRequest} containing a list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and {@link + * feast.serving.ServingAPIProto.DatasetSource} pointing to remote location of dataset to join + * retrieved features to. + * @return {@link GetBatchFeaturesResponse} containing reference to a retrieval {@link + * feast.serving.ServingAPIProto.Job}. + */ GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest); + /** + * Get the status of a retrieval job from a batch serving store. + * + *

The client should check the status of the returned job periodically by calling ReloadJob to + * determine if the job has completed successfully or with an error. If the job completes + * successfully i.e. status = JOB_STATUS_DONE with no error, then the client can check the + * file_uris for the location to download feature values data. The client is assumed to have + * access to these file URIs. + * + *

If an error occurred during retrieval, the {@link GetJobResponse} will also contain the + * error that resulted in termination. + * + * @param getJobRequest {@link GetJobRequest} containing reference to a retrieval job + * @return {@link GetJobResponse} + */ GetJobResponse getJob(GetJobRequest getJobRequest); } From e69007115f142df3e82b51d48176123d564b8bba Mon Sep 17 00:00:00 2001 From: zhilingc Date: Sat, 4 Apr 2020 10:56:26 +0800 Subject: [PATCH 2/3] Make getStagingLocation method of historical retriever --- .../feast/serving/configuration/ServingServiceConfig.java | 2 +- .../feast/serving/service/HistoricalServingService.java | 7 ++----- .../feast/storage/api/retrieval/HistoricalRetriever.java | 7 +++++++ .../bigquery/retrieval/BigQueryHistoricalRetriever.java | 5 +++++ 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index 5a901567126..d252bd777e3 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -118,7 +118,7 @@ public ServingService servingService( .build(); servingService = - new HistoricalServingService(bqRetriever, specService, jobService, jobStagingLocation); + new HistoricalServingService(bqRetriever, specService, jobService); break; case CASSANDRA: case UNRECOGNIZED: diff --git a/serving/src/main/java/feast/serving/service/HistoricalServingService.java b/serving/src/main/java/feast/serving/service/HistoricalServingService.java index c8353958822..cc6f50896de 100644 --- a/serving/src/main/java/feast/serving/service/HistoricalServingService.java +++ b/serving/src/main/java/feast/serving/service/HistoricalServingService.java @@ -37,17 +37,14 @@ public class HistoricalServingService implements ServingService { private final HistoricalRetriever retriever; private final CachedSpecService specService; private final JobService jobService; - private String stagingLocation; public HistoricalServingService( HistoricalRetriever retriever, CachedSpecService specService, - JobService jobService, - String stagingLocation) { + JobService jobService) { this.retriever = retriever; this.specService = specService; this.jobService = jobService; - this.stagingLocation = stagingLocation; } /** {@inheritDoc} */ @@ -56,7 +53,7 @@ public GetFeastServingInfoResponse getFeastServingInfo( GetFeastServingInfoRequest getFeastServingInfoRequest) { return GetFeastServingInfoResponse.newBuilder() .setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) - .setJobStagingLocation(stagingLocation) + .setJobStagingLocation(retriever.getStagingLocation()) .build(); } diff --git a/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java b/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java index 3533ed140f8..008f1229e09 100644 --- a/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java +++ b/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java @@ -25,6 +25,13 @@ */ public interface HistoricalRetriever { + /** + * Get temporary staging location if applicable. If not applicable to this store, returns an empty string. + * + * @return staging location uri + */ + String getStagingLocation(); + /** * Get all features corresponding to the provided batch features request. * diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java index 63648604331..17efb070115 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java @@ -85,6 +85,11 @@ public abstract static class Builder { public abstract BigQueryHistoricalRetriever build(); } + @Override + public String getStagingLocation() { + return jobStagingLocation(); + } + @Override public HistoricalRetrievalResult getHistoricalFeatures( String retrievalId, DatasetSource datasetSource, List featureSetRequests) { From 2bf0f0b87874c00534ff5585ab12b066dc15bf72 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Sat, 4 Apr 2020 10:59:41 +0800 Subject: [PATCH 3/3] Apply spotless --- .../feast/serving/configuration/ServingServiceConfig.java | 3 +-- .../java/feast/serving/service/HistoricalServingService.java | 4 +--- .../java/feast/storage/api/retrieval/HistoricalRetriever.java | 3 ++- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index d252bd777e3..a149b3fa14c 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -117,8 +117,7 @@ public ServingService servingService( .setStorage(storage) .build(); - servingService = - new HistoricalServingService(bqRetriever, specService, jobService); + servingService = new HistoricalServingService(bqRetriever, specService, jobService); break; case CASSANDRA: case UNRECOGNIZED: diff --git a/serving/src/main/java/feast/serving/service/HistoricalServingService.java b/serving/src/main/java/feast/serving/service/HistoricalServingService.java index cc6f50896de..3c42c96d243 100644 --- a/serving/src/main/java/feast/serving/service/HistoricalServingService.java +++ b/serving/src/main/java/feast/serving/service/HistoricalServingService.java @@ -39,9 +39,7 @@ public class HistoricalServingService implements ServingService { private final JobService jobService; public HistoricalServingService( - HistoricalRetriever retriever, - CachedSpecService specService, - JobService jobService) { + HistoricalRetriever retriever, CachedSpecService specService, JobService jobService) { this.retriever = retriever; this.specService = specService; this.jobService = jobService; diff --git a/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java b/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java index 008f1229e09..760eeedc7e5 100644 --- a/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java +++ b/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java @@ -26,7 +26,8 @@ public interface HistoricalRetriever { /** - * Get temporary staging location if applicable. If not applicable to this store, returns an empty string. + * Get temporary staging location if applicable. If not applicable to this store, returns an empty + * string. * * @return staging location uri */