diff --git a/serving/src/main/java/feast/serving/service/OnlineServingService.java b/serving/src/main/java/feast/serving/service/OnlineServingService.java index c6f2178a7f2..3a91a3c955d 100644 --- a/serving/src/main/java/feast/serving/service/OnlineServingService.java +++ b/serving/src/main/java/feast/serving/service/OnlineServingService.java @@ -16,16 +16,13 @@ */ package feast.serving.service; -import static feast.serving.util.Metrics.requestCount; -import static feast.serving.util.Metrics.staleKeyCount; -import static feast.serving.util.RefUtil.generateFeatureStringRef; - import com.google.common.collect.Maps; import com.google.protobuf.Duration; import feast.serving.ServingAPIProto.*; import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; import feast.serving.specs.CachedSpecService; +import feast.serving.util.Metrics; import feast.serving.util.RefUtil; import feast.storage.api.retrieval.FeatureSetRequest; import feast.storage.api.retrieval.OnlineRetriever; @@ -65,7 +62,7 @@ public GetFeastServingInfoResponse getFeastServingInfo( /** {@inheritDoc} */ @Override public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest request) { - try (Scope scope = tracer.buildSpan("Redis-getOnlineFeatures").startActive(true)) { + try (Scope scope = tracer.buildSpan("getOnlineFeatures").startActive(true)) { GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder = GetOnlineFeaturesResponse.newBuilder(); List featureSetRequests = @@ -85,13 +82,11 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ List featureRowsForFs = featureRows.get(fsIdx); FeatureSetRequest featureSetRequest = featureSetRequests.get(fsIdx); + String project = featureSetRequest.getSpec().getProject(); + // In order to return values containing the same feature references provided by the user, // we reuse the feature references in the request as the keys in the featureValuesMap - Map featureNames = - featureSetRequest.getFeatureReferences().stream() - .collect( - Collectors.toMap( - FeatureReference::getName, featureReference -> featureReference)); + Map refsByName = featureSetRequest.getFeatureRefsByName(); // Each feature row returned (per feature set request) corresponds to a given entity row. // For each feature row, update the featureValuesMap. @@ -106,36 +101,23 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ .parallelStream() .forEach( ref -> { - staleKeyCount - .labels( - featureSetRequest.getSpec().getProject(), - String.format("%s:%d", ref.getName(), ref.getVersion())) - .inc(); + populateStaleKeyCountMetrics(project, ref); featureValuesMap .get(entityRow) .put(RefUtil.generateFeatureStringRef(ref), Value.newBuilder().build()); }); } else { - featureSetRequest - .getFeatureReferences() - .parallelStream() - .forEach( - ref -> - requestCount - .labels( - featureSetRequest.getSpec().getProject(), - String.format("%s:%d", ref.getName(), ref.getVersion())) - .inc()); + populateRequestCountMetrics(featureSetRequest); // Else populate the featureValueMap at this entityRow with the values in the feature // row. featureRow.getFieldsList().stream() - .filter(field -> featureNames.containsKey(field.getName())) + .filter(field -> refsByName.containsKey(field.getName())) .forEach( field -> { - FeatureReference ref = featureNames.get(field.getName()); - String id = generateFeatureStringRef(ref); + FeatureReference ref = refsByName.get(field.getName()); + String id = RefUtil.generateFeatureStringRef(ref); featureValuesMap.get(entityRow).put(id, field.getValue()); }); } @@ -150,6 +132,24 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ } } + private void populateStaleKeyCountMetrics(String project, FeatureReference ref) { + Metrics.staleKeyCount + .labels(project, RefUtil.generateFeatureStringRefWithoutProject(ref)) + .inc(); + } + + private void populateRequestCountMetrics(FeatureSetRequest featureSetRequest) { + String project = featureSetRequest.getSpec().getProject(); + featureSetRequest + .getFeatureReferences() + .parallelStream() + .forEach( + ref -> + Metrics.requestCount + .labels(project, RefUtil.generateFeatureStringRefWithoutProject(ref)) + .inc()); + } + @Override public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest) { throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException(); @@ -162,7 +162,8 @@ public GetJobResponse getJob(GetJobRequest getJobRequest) { private boolean isStale( FeatureSetRequest featureSetRequest, EntityRow entityRow, FeatureRow featureRow) { - if (featureSetRequest.getSpec().getMaxAge().equals(Duration.getDefaultInstance())) { + Duration maxAge = featureSetRequest.getSpec().getMaxAge(); + if (maxAge.equals(Duration.getDefaultInstance())) { return false; } long givenTimestamp = entityRow.getEntityTimestamp().getSeconds(); @@ -170,6 +171,6 @@ private boolean isStale( givenTimestamp = System.currentTimeMillis() / 1000; } long timeDifference = givenTimestamp - featureRow.getEventTimestamp().getSeconds(); - return timeDifference > featureSetRequest.getSpec().getMaxAge().getSeconds(); + return timeDifference > maxAge.getSeconds(); } } diff --git a/serving/src/main/java/feast/serving/util/RefUtil.java b/serving/src/main/java/feast/serving/util/RefUtil.java index 74de3e65620..c3bcb0827a2 100644 --- a/serving/src/main/java/feast/serving/util/RefUtil.java +++ b/serving/src/main/java/feast/serving/util/RefUtil.java @@ -28,6 +28,14 @@ public static String generateFeatureStringRef(FeatureReference featureReference) return ref; } + public static String generateFeatureStringRefWithoutProject(FeatureReference featureReference) { + String ref = String.format("%s", featureReference.getName()); + if (featureReference.getVersion() > 0) { + return ref + String.format(":%d", featureReference.getVersion()); + } + return ref; + } + public static String generateFeatureSetStringRef(FeatureSetSpec featureSetSpec) { String ref = String.format("%s/%s", featureSetSpec.getProject(), featureSetSpec.getName()); if (featureSetSpec.getVersion() > 0) { diff --git a/storage/api/src/main/java/feast/storage/api/retrieval/FeatureSetRequest.java b/storage/api/src/main/java/feast/storage/api/retrieval/FeatureSetRequest.java index 27a80a8f4aa..bd3a8bc6912 100644 --- a/storage/api/src/main/java/feast/storage/api/retrieval/FeatureSetRequest.java +++ b/storage/api/src/main/java/feast/storage/api/retrieval/FeatureSetRequest.java @@ -21,6 +21,8 @@ import feast.core.FeatureSetProto.FeatureSetSpec; import feast.serving.ServingAPIProto.FeatureReference; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @AutoValue public abstract class FeatureSetRequest { @@ -50,4 +52,9 @@ public Builder addFeatureReference(FeatureReference featureReference) { public abstract FeatureSetRequest build(); } + + public Map getFeatureRefsByName() { + return getFeatureReferences().stream() + .collect(Collectors.toMap(FeatureReference::getName, featureReference -> featureReference)); + } }