From 66f979cf9bec76ba17cb18d58d5b446ec5bf304b Mon Sep 17 00:00:00 2001
From: Willem Pienaar <6728866+woop@users.noreply.github.com>
Date: Tue, 18 Feb 2020 11:56:40 +0800
Subject: [PATCH 01/13] Update comments on FeatureRow
(cherry picked from commit 65f2ad7f43450cd8a3f7d64b6a1651a924881008)
---
protos/feast/types/FeatureRow.proto | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto
index 24293c6faa6..c170cd5d502 100644
--- a/protos/feast/types/FeatureRow.proto
+++ b/protos/feast/types/FeatureRow.proto
@@ -36,7 +36,7 @@ message FeatureRow {
google.protobuf.Timestamp event_timestamp = 3;
// Complete reference to the featureSet this featureRow belongs to, in the form of
- // featureSetName:version. This value will be used by the feast ingestion job to filter
+ // /:. This value will be used by the feast ingestion job to filter
// rows, and write the values to the correct tables.
string feature_set = 6;
-}
\ No newline at end of file
+}
From 48a5e94625c25be13cfe14ab5663c6a6c258fbfc Mon Sep 17 00:00:00 2001
From: Willem Pienaar <6728866+woop@users.noreply.github.com>
Date: Tue, 18 Feb 2020 22:30:47 +0800
Subject: [PATCH 02/13] Fix time range bug in basic example
(cherry picked from commit fb39c6cc031086f3d84c893e846ace1363c6bff7)
---
examples/basic/basic.ipynb | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/examples/basic/basic.ipynb b/examples/basic/basic.ipynb
index 94fc82f2ce9..b9893011d97 100644
--- a/examples/basic/basic.ipynb
+++ b/examples/basic/basic.ipynb
@@ -203,7 +203,7 @@
"outputs": [],
"source": [
"days = [datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=utc) \\\n",
- " - timedelta(day) for day in range(31)]\n",
+ " - timedelta(day) for day in range(3)][::-1]\n",
"\n",
"customers = [1001, 1002, 1003, 1004, 1005]"
]
From 3322dca546f48287d5cffa2fd9f9f6aed585ab54 Mon Sep 17 00:00:00 2001
From: Willem Pienaar <6728866+woop@users.noreply.github.com>
Date: Thu, 20 Feb 2020 16:47:38 +0800
Subject: [PATCH 03/13] Reduce refresh rate of specification refresh in Serving
to 10 seconds (#481)
(cherry picked from commit 887f9e361c0f021e44bc096a2ba6ada1c6ab1a52)
---
.../feast/serving/configuration/SpecServiceConfig.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java b/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java
index 3c91c2765aa..0b3a2938b8e 100644
--- a/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java
+++ b/serving/src/main/java/feast/serving/configuration/SpecServiceConfig.java
@@ -35,7 +35,7 @@ public class SpecServiceConfig {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(SpecServiceConfig.class);
private String feastCoreHost;
private int feastCorePort;
- private static final int CACHE_REFRESH_RATE_MINUTES = 1;
+ private static final int CACHE_REFRESH_RATE_SECONDS = 10;
@Autowired
public SpecServiceConfig(FeastProperties feastProperties) {
@@ -51,9 +51,9 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService(
// reload all specs including new ones periodically
scheduledExecutorService.scheduleAtFixedRate(
cachedSpecStorage::scheduledPopulateCache,
- CACHE_REFRESH_RATE_MINUTES,
- CACHE_REFRESH_RATE_MINUTES,
- TimeUnit.MINUTES);
+ CACHE_REFRESH_RATE_SECONDS,
+ CACHE_REFRESH_RATE_SECONDS,
+ TimeUnit.SECONDS);
return scheduledExecutorService;
}
From 3df461e0694933b3a8e2f49633557459c1d458e4 Mon Sep 17 00:00:00 2001
From: Willem Pienaar
Date: Thu, 20 Feb 2020 21:00:51 +0800
Subject: [PATCH 04/13] Expose PosgreSQL port in Docker Compose
(cherry picked from commit aec7979f2986e56d7be525cc6b27a1eeb786d501)
---
infra/docker-compose/docker-compose.yml | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/infra/docker-compose/docker-compose.yml b/infra/docker-compose/docker-compose.yml
index 27d82efc3ca..87d56cbe925 100644
--- a/infra/docker-compose/docker-compose.yml
+++ b/infra/docker-compose/docker-compose.yml
@@ -106,4 +106,6 @@ services:
ZOOKEEPER_CLIENT_PORT: 2181
db:
- image: postgres:12-alpine
\ No newline at end of file
+ image: postgres:12-alpine
+ ports:
+ - "5432:5342"
\ No newline at end of file
From f745d82bf54453c353bf217f5e45c97d402e6587 Mon Sep 17 00:00:00 2001
From: Ches Martin
Date: Tue, 25 Feb 2020 06:08:40 +0700
Subject: [PATCH 05/13] Fail Spotless formatting check before tests execute
(#487)
* Fail formatting check before tests execute
By default, the spotless Maven plugin binds its check goal to the verify
phase (late in the lifecycle, after integration tests). Because we
currently only run `mvn test` for CI, it doesn't proceed as far as
verify so missed formatting is not caught by CI.
This binds the check to an earlier phase, in between test-compile and
test, so that it will fail before `mvn test` but not disrupt your dev
workflow of compiling main and test sources as you work. This strikes a
good compromise on failing fast for code standards without being _too_
nagging.
For the complete lifecycle reference, see:
https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html
* Apply spotless formatting
(cherry picked from commit 636354092c3967c4b89d4c345ffc281f9f7c592d)
---
.../test/java/feast/ingestion/ImportJobTest.java | 15 +++++++--------
pom.xml | 10 ++++++++++
.../feast/serving/specs/CachedSpecService.java | 6 +-----
3 files changed, 18 insertions(+), 13 deletions(-)
diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java
index 58ecae8f045..1148fa40422 100644
--- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java
+++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java
@@ -32,14 +32,12 @@
import feast.core.StoreProto.Store.Subscription;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
-import feast.ingestion.options.OptionByteConverter;
import feast.storage.RedisProto.RedisKey;
import feast.test.TestUtil;
import feast.test.TestUtil.LocalKafka;
import feast.test.TestUtil.LocalRedis;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.ValueProto.ValueType.Enum;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -51,7 +49,6 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.joda.time.Duration;
import org.junit.AfterClass;
@@ -166,11 +163,13 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
.build();
ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class);
- BZip2Compressor compressor = new BZip2Compressor<>(option -> {
- JsonFormat.Printer printer =
- JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts();
- return printer.print(option).getBytes();
- });
+ BZip2Compressor compressor =
+ new BZip2Compressor<>(
+ option -> {
+ JsonFormat.Printer printer =
+ JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts();
+ return printer.print(option).getBytes();
+ });
options.setFeatureSetJson(compressor.compress(spec));
options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis)));
options.setProject("");
diff --git a/pom.xml b/pom.xml
index 3ba6a592cfa..df0b4a85bd8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -372,6 +372,16 @@
+
+
+
+ spotless-check
+ process-test-classes
+
+ check
+
+
+
org.apache.maven.plugins
diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java
index 1184f6da95a..35119589b27 100644
--- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java
+++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java
@@ -195,11 +195,7 @@ private Map getFeatureToFeatureSetMapping(
HashMap mapping = new HashMap<>();
featureSets.values().stream()
- .collect(
- groupingBy(
- featureSet ->
- Pair.of(
- featureSet.getProject(), featureSet.getName())))
+ .collect(groupingBy(featureSet -> Pair.of(featureSet.getProject(), featureSet.getName())))
.forEach(
(group, groupedFeatureSets) -> {
groupedFeatureSets =
From 92c19636faee62cb49085ed9a955adf11f8e18a5 Mon Sep 17 00:00:00 2001
From: David Heryanto
Date: Tue, 25 Feb 2020 10:16:40 +0800
Subject: [PATCH 06/13] Fix fastavro version used in Feast to avoid Timestamp
delta error (#490)
* Fix fastavro version used in feast to 0.22.9
* Print python packages version used when e2e test fails
(cherry picked from commit 7586da644c16c2d96f63eee4783d600b013f502d)
---
.prow/scripts/test-end-to-end-batch.sh | 9 +++++++++
.prow/scripts/test-end-to-end.sh | 9 +++++++++
sdk/python/setup.py | 4 +++-
3 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/.prow/scripts/test-end-to-end-batch.sh b/.prow/scripts/test-end-to-end-batch.sh
index ac312391d55..4d1b1d2ecd7 100755
--- a/.prow/scripts/test-end-to-end-batch.sh
+++ b/.prow/scripts/test-end-to-end-batch.sh
@@ -235,6 +235,15 @@ set +e
pytest bq-batch-retrieval.py --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
TEST_EXIT_CODE=$?
+if [[ ${TEST_EXIT_CODE} != 0 ]]; then
+ echo "[DEBUG] Printing logs"
+ ls -ltrh /var/log/feast*
+ cat /var/log/feast-serving-warehouse.log /var/log/feast-core.log
+
+ echo "[DEBUG] Printing Python packages list"
+ pip list
+fi
+
cd ${ORIGINAL_DIR}
exit ${TEST_EXIT_CODE}
diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh
index cc65968ca22..45cb9c82ae8 100755
--- a/.prow/scripts/test-end-to-end.sh
+++ b/.prow/scripts/test-end-to-end.sh
@@ -225,5 +225,14 @@ set +e
pytest basic-ingest-redis-serving.py --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
TEST_EXIT_CODE=$?
+if [[ ${TEST_EXIT_CODE} != 0 ]]; then
+ echo "[DEBUG] Printing logs"
+ ls -ltrh /var/log/feast*
+ cat /var/log/feast-serving-online.log /var/log/feast-core.log
+
+ echo "[DEBUG] Printing Python packages list"
+ pip list
+fi
+
cd ${ORIGINAL_DIR}
exit ${TEST_EXIT_CODE}
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index d0b37ad9419..3fc77540c02 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -37,7 +37,9 @@
"pandavro==1.5.*",
"protobuf>=3.10",
"PyYAML==5.1.*",
- "fastavro==0.*",
+ # fastavro 0.22.10 and newer will throw this error for e2e batch test:
+ # TypeError: Timestamp subtraction must have the same timezones or no timezones
+ "fastavro==0.22.9",
"kafka-python==1.*",
"tabulate==0.8.*",
"toml==0.10.*",
From bbc16c1e5e840f6acc87c323ca9793192f2adbca Mon Sep 17 00:00:00 2001
From: Julio Anthony Leonard
Date: Tue, 25 Feb 2020 12:04:40 +0700
Subject: [PATCH 07/13] Remove transaction from ingestion redis (#480)
(cherry picked from commit c3591edc37dbb5afe2a5058ffbf0a2cc03f59775)
---
.../src/main/java/feast/store/serving/redis/RedisCustomIO.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
index 8c142b66c93..8541baaffc3 100644
--- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
+++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
@@ -238,7 +238,6 @@ private void executeBatch() throws Exception {
new Retriable() {
@Override
public void execute() {
- pipeline.multi();
mutations.forEach(
mutation -> {
writeRecord(mutation);
@@ -246,7 +245,6 @@ public void execute() {
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
}
});
- pipeline.exec();
pipeline.sync();
mutations.clear();
}
From e52f86c5df252ddb8e52a2b44cc8cdf2e1d3d9e6 Mon Sep 17 00:00:00 2001
From: David Heryanto
Date: Tue, 25 Feb 2020 15:45:40 +0800
Subject: [PATCH 08/13] Extend WriteMetricsTransform in Ingestion to write
feature value stats to StatsD (#486)
* Extend WriteMetricsTransform to write feature value stats to StatsD
* Apply mvn spotless
* Catch all exception not just StatsDClientException during init
Since there are other exception like UnknownHostException that can be thrown and we want to know such error. Also change the log level to error because so it's not normal for client to fail to be created"
* Change log level due to invalid feature set ref to error (previously warn)
On 2nd thought, this should constitute an error not a warning
* Apply maven spotless to metric transform codes
(cherry picked from commit 5508c9230c7359ceb761c0b71ac9924e0423fbb4)
---
.prow/scripts/test-end-to-end.sh | 4 +
ingestion/pom.xml | 7 +
.../ingestion/options/ImportOptions.java | 10 +
.../metrics/WriteFeatureValueMetricsDoFn.java | 311 +++++++++++++++++
.../metrics/WriteMetricsTransform.java | 41 +++
.../metrics/WriteRowMetricsDoFn.java | 14 +-
.../WriteFeatureValueMetricsDoFnTest.java | 315 ++++++++++++++++++
.../WriteFeatureValueMetricsDoFnTest.README | 9 +
.../WriteFeatureValueMetricsDoFnTest.input | 4 +
.../WriteFeatureValueMetricsDoFnTest.output | 66 ++++
10 files changed, 774 insertions(+), 7 deletions(-)
create mode 100644 ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java
create mode 100644 ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README
create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input
create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output
diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh
index 45cb9c82ae8..b9d7fa90882 100755
--- a/.prow/scripts/test-end-to-end.sh
+++ b/.prow/scripts/test-end-to-end.sh
@@ -66,6 +66,7 @@ sleep 20
tail -n10 /var/log/kafka.log
kafkacat -b localhost:9092 -L
+if [[ ${SKIP_BUILD_JARS} != "true" ]]; then
echo "
============================================================
Building jars for Feast
@@ -81,6 +82,9 @@ mvn --quiet --batch-mode --define skipTests=true clean package
ls -lh core/target/*jar
ls -lh serving/target/*jar
+else
+ echo "[DEBUG] Skipping building jars"
+fi
echo "
============================================================
diff --git a/ingestion/pom.xml b/ingestion/pom.xml
index c829674a64d..001da1a1453 100644
--- a/ingestion/pom.xml
+++ b/ingestion/pom.xml
@@ -248,5 +248,12 @@
2.8.1
+
+
+ org.apache.commons
+ commons-math3
+ 3.6.1
+
+
diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java
index 6afdd80dd72..c1bdcd5fd17 100644
--- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java
+++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java
@@ -26,6 +26,7 @@
/** Options passed to Beam to influence the job's execution environment */
public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions {
+
@Required
@Description(
"JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format."
@@ -83,4 +84,13 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions,
int getStatsdPort();
void setStatsdPort(int StatsdPort);
+
+ @Description(
+ "Fixed window size in seconds (default 30) to apply before aggregation of numerical value of features"
+ + "and writing the aggregated value to StatsD. Refer to feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFn"
+ + "for details on the metric names and types.")
+ @Default.Integer(30)
+ int getWindowSizeInSecForFeatureValueMetric();
+
+ void setWindowSizeInSecForFeatureValueMetric(int seconds);
}
diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java
new file mode 100644
index 00000000000..8574d2414c3
--- /dev/null
+++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java
@@ -0,0 +1,311 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.transform.metrics;
+
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_VERSION_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY;
+
+import com.google.auto.value.AutoValue;
+import com.timgroup.statsd.NonBlockingStatsDClient;
+import com.timgroup.statsd.StatsDClient;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.FieldProto.Field;
+import feast.types.ValueProto.Value;
+import java.util.ArrayList;
+import java.util.DoubleSummaryStatistics;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.stat.descriptive.rank.Percentile;
+import org.slf4j.Logger;
+
+/**
+ * WriteFeatureValueMetricsDoFn accepts key value of FeatureSetRef(str) to FeatureRow(List) and
+ * writes a histogram of the numerical values of each feature to StatsD.
+ *
+ *
The histogram of the numerical values is represented as the following in StatsD:
+ *
+ *
+ * - gauge of feature_value_min
+ *
- gauge of feature_value_max
+ *
- gauge of feature_value_mean
+ *
- gauge of feature_value_percentile_50
+ *
- gauge of feature_value_percentile_90
+ *
- gauge of feature_value_percentile_95
+ *
+ *
+ * StatsD timing/histogram metric type is not used since it does not support negative values.
+ */
+@AutoValue
+public abstract class WriteFeatureValueMetricsDoFn
+ extends DoFn>, Void> {
+
+ abstract String getStoreName();
+
+ abstract String getStatsdHost();
+
+ abstract int getStatsdPort();
+
+ static Builder newBuilder() {
+ return new AutoValue_WriteFeatureValueMetricsDoFn.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setStoreName(String storeName);
+
+ abstract Builder setStatsdHost(String statsdHost);
+
+ abstract Builder setStatsdPort(int statsdPort);
+
+ abstract WriteFeatureValueMetricsDoFn build();
+ }
+
+ private static final Logger log =
+ org.slf4j.LoggerFactory.getLogger(WriteFeatureValueMetricsDoFn.class);
+ private StatsDClient statsDClient;
+ public static String GAUGE_NAME_FEATURE_VALUE_MIN = "feature_value_min";
+ public static String GAUGE_NAME_FEATURE_VALUE_MAX = "feature_value_max";
+ public static String GAUGE_NAME_FEATURE_VALUE_MEAN = "feature_value_mean";
+ public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50 = "feature_value_percentile_50";
+ public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90 = "feature_value_percentile_90";
+ public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95 = "feature_value_percentile_95";
+
+ @Setup
+ public void setup() {
+ // Note that exception may be thrown during StatsD client instantiation but no exception
+ // will be thrown when sending metrics (mimicking the UDP protocol behaviour).
+ // https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation
+ // https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support
+ try {
+ statsDClient = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort());
+ } catch (Exception e) {
+ log.error("StatsD client cannot be started: " + e.getMessage());
+ }
+ }
+
+ @Teardown
+ public void tearDown() {
+ if (statsDClient != null) {
+ statsDClient.close();
+ }
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext context,
+ @Element KV> featureSetRefToFeatureRows) {
+ if (statsDClient == null) {
+ return;
+ }
+
+ String featureSetRef = featureSetRefToFeatureRows.getKey();
+ if (featureSetRef == null) {
+ return;
+ }
+ String[] colonSplits = featureSetRef.split(":");
+ if (colonSplits.length != 2) {
+ log.error(
+ "Skip writing feature value metrics because the feature set reference '{}' does not"
+ + "follow the required format /:",
+ featureSetRef);
+ return;
+ }
+ String[] slashSplits = colonSplits[0].split("/");
+ if (slashSplits.length != 2) {
+ log.error(
+ "Skip writing feature value metrics because the feature set reference '{}' does not"
+ + "follow the required format /:",
+ featureSetRef);
+ return;
+ }
+ String projectName = slashSplits[0];
+ String featureSetName = slashSplits[1];
+ String version = colonSplits[1];
+
+ Map featureNameToStats = new HashMap<>();
+ Map> featureNameToValues = new HashMap<>();
+ for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) {
+ for (Field field : featureRow.getFieldsList()) {
+ updateStats(featureNameToStats, featureNameToValues, field);
+ }
+ }
+
+ for (Entry entry : featureNameToStats.entrySet()) {
+ String featureName = entry.getKey();
+ DoubleSummaryStatistics stats = entry.getValue();
+ String[] tags = {
+ STORE_TAG_KEY + ":" + getStoreName(),
+ FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName,
+ FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
+ FEATURE_SET_VERSION_TAG_KEY + ":" + version,
+ FEATURE_TAG_KEY + ":" + featureName,
+ INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName()
+ };
+
+ // stats can return non finite values when there is no element
+ // or there is an element that is not a number. Metric should only be sent for finite values.
+ if (Double.isFinite(stats.getMin())) {
+ if (stats.getMin() < 0) {
+ // StatsD gauge will asssign a delta instead of the actual value, if there is a sign in
+ // the value. E.g. if the value is negative, a delta will be assigned. For this reason,
+ // the gauge value is set to zero beforehand.
+ // https://github.com/statsd/statsd/blob/master/docs/metric_types.md#gauges
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, stats.getMin(), tags);
+ }
+ if (Double.isFinite(stats.getMax())) {
+ if (stats.getMax() < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, stats.getMax(), tags);
+ }
+ if (Double.isFinite(stats.getAverage())) {
+ if (stats.getAverage() < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, stats.getAverage(), tags);
+ }
+
+ // For percentile calculation, Percentile class from commons-math3 from Apache is used.
+ // Percentile requires double[], hence the conversion below.
+ if (!featureNameToValues.containsKey(featureName)) {
+ continue;
+ }
+ List valueList = featureNameToValues.get(featureName);
+ if (valueList == null || valueList.size() < 1) {
+ continue;
+ }
+ double[] values = new double[valueList.size()];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = valueList.get(i);
+ }
+
+ double p50 = new Percentile().evaluate(values, 50);
+ if (p50 < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, p50, tags);
+
+ double p90 = new Percentile().evaluate(values, 90);
+ if (p90 < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, p90, tags);
+
+ double p95 = new Percentile().evaluate(values, 95);
+ if (p95 < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, p95, tags);
+ }
+ }
+
+ // Update stats and values array for the feature represented by the field.
+ // If the field contains non-numerical or non-boolean value, the stats and values array
+ // won't get updated because we are only concerned with numerical value in metrics data.
+ // For boolean value, true and false are treated as numerical value of 1 of 0 respectively.
+ private void updateStats(
+ Map featureNameToStats,
+ Map> featureNameToValues,
+ Field field) {
+ if (featureNameToStats == null || featureNameToValues == null || field == null) {
+ return;
+ }
+
+ String featureName = field.getName();
+ if (!featureNameToStats.containsKey(featureName)) {
+ featureNameToStats.put(featureName, new DoubleSummaryStatistics());
+ }
+ if (!featureNameToValues.containsKey(featureName)) {
+ featureNameToValues.put(featureName, new ArrayList<>());
+ }
+
+ Value value = field.getValue();
+ DoubleSummaryStatistics stats = featureNameToStats.get(featureName);
+ List values = featureNameToValues.get(featureName);
+
+ switch (value.getValCase()) {
+ case INT32_VAL:
+ stats.accept(value.getInt32Val());
+ values.add(((double) value.getInt32Val()));
+ break;
+ case INT64_VAL:
+ stats.accept(value.getInt64Val());
+ values.add((double) value.getInt64Val());
+ break;
+ case DOUBLE_VAL:
+ stats.accept(value.getDoubleVal());
+ values.add(value.getDoubleVal());
+ break;
+ case FLOAT_VAL:
+ stats.accept(value.getFloatVal());
+ values.add((double) value.getFloatVal());
+ break;
+ case BOOL_VAL:
+ stats.accept(value.getBoolVal() ? 1 : 0);
+ values.add(value.getBoolVal() ? 1d : 0d);
+ break;
+ case INT32_LIST_VAL:
+ for (Integer val : value.getInt32ListVal().getValList()) {
+ stats.accept(val);
+ values.add(((double) val));
+ }
+ break;
+ case INT64_LIST_VAL:
+ for (Long val : value.getInt64ListVal().getValList()) {
+ stats.accept(val);
+ values.add(((double) val));
+ }
+ break;
+ case DOUBLE_LIST_VAL:
+ for (Double val : value.getDoubleListVal().getValList()) {
+ stats.accept(val);
+ values.add(val);
+ }
+ break;
+ case FLOAT_LIST_VAL:
+ for (Float val : value.getFloatListVal().getValList()) {
+ stats.accept(val);
+ values.add(((double) val));
+ }
+ break;
+ case BOOL_LIST_VAL:
+ for (Boolean val : value.getBoolListVal().getValList()) {
+ stats.accept(val ? 1 : 0);
+ values.add(val ? 1d : 0d);
+ }
+ break;
+ case BYTES_VAL:
+ case BYTES_LIST_VAL:
+ case STRING_VAL:
+ case STRING_LIST_VAL:
+ case VAL_NOT_SET:
+ default:
+ }
+ }
+}
diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java
index 43f314aa861..10322ac812f 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java
@@ -21,11 +21,16 @@
import feast.ingestion.values.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
@AutoValue
public abstract class WriteMetricsTransform extends PTransform {
@@ -79,6 +84,42 @@ public PDone expand(PCollectionTuple input) {
.setStoreName(getStoreName())
.build()));
+ // 1. Apply a fixed window
+ // 2. Group feature row by feature set reference
+ // 3. Calculate min, max, mean, percentiles of numerical values of features in the window
+ // and
+ // 4. Send the aggregate value to StatsD metric collector.
+ //
+ // NOTE: window is applied here so the metric collector will not be overwhelmed with
+ // metrics data. And for metric data, only statistic of the values are usually required
+ // vs the actual values.
+ input
+ .get(getSuccessTag())
+ .apply(
+ "FixedWindow",
+ Window.into(
+ FixedWindows.of(
+ Duration.standardSeconds(
+ options.getWindowSizeInSecForFeatureValueMetric()))))
+ .apply(
+ "ConvertTo_FeatureSetRefToFeatureRow",
+ ParDo.of(
+ new DoFn>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, @Element FeatureRow featureRow) {
+ c.output(KV.of(featureRow.getFeatureSet(), featureRow));
+ }
+ }))
+ .apply("GroupByFeatureSetRef", GroupByKey.create())
+ .apply(
+ "WriteFeatureValueMetrics",
+ ParDo.of(
+ WriteFeatureValueMetricsDoFn.newBuilder()
+ .setStatsdHost(options.getStatsdHost())
+ .setStatsdPort(options.getStatsdPort())
+ .setStoreName(getStoreName())
+ .build()));
+
return PDone.in(input.getPipeline());
case "none":
default:
diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java
index db2d1acd6d8..2cd1ee94ecc 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java
@@ -31,13 +31,13 @@ public abstract class WriteRowMetricsDoFn extends DoFn {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class);
- private final String METRIC_PREFIX = "feast_ingestion";
- private final String STORE_TAG_KEY = "feast_store";
- private final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name";
- private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name";
- private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version";
- private final String FEATURE_TAG_KEY = "feast_feature_name";
- private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";
+ public static final String METRIC_PREFIX = "feast_ingestion";
+ public static final String STORE_TAG_KEY = "feast_store";
+ public static final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name";
+ public static final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name";
+ public static final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version";
+ public static final String FEATURE_TAG_KEY = "feast_feature_name";
+ public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";
public abstract String getStoreName();
diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
new file mode 100644
index 00000000000..8f0adf40168
--- /dev/null
+++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
@@ -0,0 +1,315 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.transform.metrics;
+
+import static org.junit.Assert.fail;
+
+import com.google.protobuf.ByteString;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.FeatureRowProto.FeatureRow.Builder;
+import feast.types.FieldProto.Field;
+import feast.types.ValueProto.BoolList;
+import feast.types.ValueProto.BytesList;
+import feast.types.ValueProto.DoubleList;
+import feast.types.ValueProto.FloatList;
+import feast.types.ValueProto.Int32List;
+import feast.types.ValueProto.Int64List;
+import feast.types.ValueProto.StringList;
+import feast.types.ValueProto.Value;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class WriteFeatureValueMetricsDoFnTest {
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ private static final int STATSD_SERVER_PORT = 17254;
+ private final DummyStatsDServer statsDServer = new DummyStatsDServer(STATSD_SERVER_PORT);
+
+ @Test
+ public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedException {
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setJobName("job");
+
+ Map> input =
+ readTestInput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input");
+ List expectedLines =
+ readTestOutput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output");
+
+ pipeline
+ .apply(Create.of(input))
+ .apply(
+ ParDo.of(
+ WriteFeatureValueMetricsDoFn.newBuilder()
+ .setStatsdHost("localhost")
+ .setStatsdPort(STATSD_SERVER_PORT)
+ .setStoreName("store")
+ .build()));
+ pipeline.run(pipelineOptions).waitUntilFinish();
+ // Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration
+ // based on empirical testing.
+ Thread.sleep(3000);
+
+ List actualLines = statsDServer.messagesReceived();
+ for (String expected : expectedLines) {
+ boolean matched = false;
+ for (String actual : actualLines) {
+ if (actual.equals(expected)) {
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ System.out.println("Print actual metrics output for debugging:");
+ for (String line : actualLines) {
+ System.out.println(line);
+ }
+ fail(String.format("Expected StatsD metric not found:\n%s", expected));
+ }
+ }
+ }
+
+ // Test utility method to read expected StatsD metrics output from a text file.
+ @SuppressWarnings("SameParameterValue")
+ private List readTestOutput(String path) throws IOException {
+ URL url = Thread.currentThread().getContextClassLoader().getResource(path);
+ if (url == null) {
+ throw new IllegalArgumentException(
+ "cannot read test data, path contains null url. Path: " + path);
+ }
+ List lines = new ArrayList<>();
+ try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) {
+ String line = reader.readLine();
+ while (line != null) {
+ if (line.trim().length() > 1) {
+ lines.add(line);
+ }
+ line = reader.readLine();
+ }
+ }
+ return lines;
+ }
+
+ // Test utility method to create test feature row data from a text file.
+ @SuppressWarnings("SameParameterValue")
+ private Map> readTestInput(String path) throws IOException {
+ Map> data = new HashMap<>();
+ URL url = Thread.currentThread().getContextClassLoader().getResource(path);
+ if (url == null) {
+ throw new IllegalArgumentException(
+ "cannot read test data, path contains null url. Path: " + path);
+ }
+ List lines = new ArrayList<>();
+ try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) {
+ String line = reader.readLine();
+ while (line != null) {
+ lines.add(line);
+ line = reader.readLine();
+ }
+ }
+ List colNames = new ArrayList<>();
+ for (String line : lines) {
+ if (line.strip().length() < 1) {
+ continue;
+ }
+ String[] splits = line.split(",");
+ colNames.addAll(Arrays.asList(splits));
+
+ if (line.startsWith("featuresetref")) {
+ // Header line
+ colNames.addAll(Arrays.asList(splits).subList(1, splits.length));
+ continue;
+ }
+
+ Builder featureRowBuilder = FeatureRow.newBuilder();
+ for (int i = 0; i < splits.length; i++) {
+ String colVal = splits[i].strip();
+ if (i == 0) {
+ featureRowBuilder.setFeatureSet(colVal);
+ continue;
+ }
+ String colName = colNames.get(i);
+ Field.Builder fieldBuilder = Field.newBuilder().setName(colName);
+ if (!colVal.isEmpty()) {
+ switch (colName) {
+ case "int32":
+ fieldBuilder.setValue(Value.newBuilder().setInt32Val((Integer.parseInt(colVal))));
+ break;
+ case "int64":
+ fieldBuilder.setValue(Value.newBuilder().setInt64Val((Long.parseLong(colVal))));
+ break;
+ case "double":
+ fieldBuilder.setValue(Value.newBuilder().setDoubleVal((Double.parseDouble(colVal))));
+ break;
+ case "float":
+ fieldBuilder.setValue(Value.newBuilder().setFloatVal((Float.parseFloat(colVal))));
+ break;
+ case "bool":
+ fieldBuilder.setValue(Value.newBuilder().setBoolVal((Boolean.parseBoolean(colVal))));
+ break;
+ case "int32list":
+ List int32List = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ int32List.add(Integer.parseInt(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setInt32ListVal(Int32List.newBuilder().addAllVal(int32List)));
+ break;
+ case "int64list":
+ List int64list = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ int64list.add(Long.parseLong(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setInt64ListVal(Int64List.newBuilder().addAllVal(int64list)));
+ break;
+ case "doublelist":
+ List doubleList = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ doubleList.add(Double.parseDouble(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder()
+ .setDoubleListVal(DoubleList.newBuilder().addAllVal(doubleList)));
+ break;
+ case "floatlist":
+ List floatList = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ floatList.add(Float.parseFloat(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setFloatListVal(FloatList.newBuilder().addAllVal(floatList)));
+ break;
+ case "boollist":
+ List boolList = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ boolList.add(Boolean.parseBoolean(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setBoolListVal(BoolList.newBuilder().addAllVal(boolList)));
+ break;
+ case "bytes":
+ fieldBuilder.setValue(
+ Value.newBuilder().setBytesVal(ByteString.copyFromUtf8("Dummy")));
+ break;
+ case "byteslist":
+ fieldBuilder.setValue(
+ Value.newBuilder().setBytesListVal(BytesList.getDefaultInstance()));
+ break;
+ case "string":
+ fieldBuilder.setValue(Value.newBuilder().setStringVal("Dummy"));
+ break;
+ case "stringlist":
+ fieldBuilder.setValue(
+ Value.newBuilder().setStringListVal(StringList.getDefaultInstance()));
+ break;
+ }
+ }
+ featureRowBuilder.addFields(fieldBuilder);
+ }
+
+ if (!data.containsKey(featureRowBuilder.getFeatureSet())) {
+ data.put(featureRowBuilder.getFeatureSet(), new ArrayList<>());
+ }
+ List featureRowsByFeatureSetRef = data.get(featureRowBuilder.getFeatureSet());
+ featureRowsByFeatureSetRef.add(featureRowBuilder.build());
+ }
+
+ // Convert List to Iterable to match the function signature in
+ // WriteFeatureValueMetricsDoFn
+ Map> dataWithIterable = new HashMap<>();
+ for (Entry> entrySet : data.entrySet()) {
+ String key = entrySet.getKey();
+ Iterable value = entrySet.getValue();
+ dataWithIterable.put(key, value);
+ }
+ return dataWithIterable;
+ }
+
+ // Modified version of
+ // https://github.com/tim-group/java-statsd-client/blob/master/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java
+ @SuppressWarnings("CatchMayIgnoreException")
+ private static final class DummyStatsDServer {
+
+ private final List messagesReceived = new ArrayList();
+ private final DatagramSocket server;
+
+ public DummyStatsDServer(int port) {
+ try {
+ server = new DatagramSocket(port);
+ } catch (SocketException e) {
+ throw new IllegalStateException(e);
+ }
+ new Thread(
+ () -> {
+ try {
+ while (true) {
+ final DatagramPacket packet = new DatagramPacket(new byte[65535], 65535);
+ server.receive(packet);
+ messagesReceived.add(
+ new String(packet.getData(), StandardCharsets.UTF_8).trim() + "\n");
+ Thread.sleep(50);
+ }
+
+ } catch (Exception e) {
+ }
+ })
+ .start();
+ }
+
+ public void stop() {
+ server.close();
+ }
+
+ public void waitForMessage() {
+ while (messagesReceived.isEmpty()) {
+ try {
+ Thread.sleep(50L);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ public List messagesReceived() {
+ List out = new ArrayList<>();
+ for (String msg : messagesReceived) {
+ String[] lines = msg.split("\n");
+ out.addAll(Arrays.asList(lines));
+ }
+ return out;
+ }
+ }
+}
diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README
new file mode 100644
index 00000000000..3c8759d1702
--- /dev/null
+++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README
@@ -0,0 +1,9 @@
+WriteFeatureValueMetricsDoFnTest.input file contains data that can be read by test utility
+into map of FeatureSetRef -> [FeatureRow]. In the first row, the cell value corresponds to the
+field name in the FeatureRow. This should not be changed as the test utility derives the value
+type from this name. Empty value in the cell is a value that is not set. For list type, the values
+of different element is separated by the '|' character.
+
+WriteFeatureValueMetricsDoFnTest.output file contains lines of expected StatsD metrics that should
+be sent when WriteFeatureValueMetricsDoFn runs. It can be checked against the actual outputted
+StatsD metrics to test for correctness.
diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input
new file mode 100644
index 00000000000..d2985711cee
--- /dev/null
+++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input
@@ -0,0 +1,4 @@
+featuresetref,int32,int64,double,float,bool,int32list,int64list,doublelist,floatlist,boollist,bytes,byteslist,string,stringlist
+project/featureset:1,1,5,8,5,true,1|4|3,5|1|12,5|7|3,-2.0,true|false,,,,
+project/featureset:1,5,-10,8,10.0,true,1|12|5,,,-1.0|-3.0,false|true,,,,
+project/featureset:1,6,-4,8,0.0,true,2,2|5,,,true|false,,,,
\ No newline at end of file
diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output
new file mode 100644
index 00000000000..63bc7bbfa4e
--- /dev/null
+++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output
@@ -0,0 +1,66 @@
+feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:6|g|#ingestion_job_name:job,feast_feature_name:int32,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_min:-10|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:-3|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:-4|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:5|g|#ingestion_job_name:job,feast_feature_name:int64,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:8|g|#ingestion_job_name:job,feast_feature_name:double,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:10|g|#ingestion_job_name:job,feast_feature_name:float,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:bool,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:4|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:3|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int32list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:1|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:12|g|#ingestion_job_name:job,feast_feature_name:int64list,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:3|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:5|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:7|g|#ingestion_job_name:job,feast_feature_name:doublelist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_min:-3|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:-2|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:0|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:-1|g|#ingestion_job_name:job,feast_feature_name:floatlist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+
+feast_ingestion.feature_value_min:0|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_max:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_mean:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_50:0.5|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
+feast_ingestion.feature_value_percentile_90:1|g|#ingestion_job_name:job,feast_feature_name:boollist,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store
\ No newline at end of file
From c586d0e3967a1e56218e5136d8277c7c7d5949b9 Mon Sep 17 00:00:00 2001
From: Iain Rauch
Date: Tue, 25 Feb 2020 23:08:40 +0000
Subject: [PATCH 09/13] Allow use of secure gRPC in Feast Python client. (#459)
* Allow use of secure gRPC in Feast Python client.
* Add tests for secure gRPC in Python client.
(cherry picked from commit a576a53df4765908bdc817bbaead86abeefb92e8)
---
sdk/python/feast/client.py | 83 +++++++--
sdk/python/requirements-ci.txt | 1 +
sdk/python/tests/data/localhost.crt | 18 ++
sdk/python/tests/data/localhost.key | 28 +++
sdk/python/tests/data/localhost.pem | 18 ++
sdk/python/tests/test_client.py | 273 +++++++++++++++++++---------
6 files changed, 326 insertions(+), 95 deletions(-)
create mode 100644 sdk/python/tests/data/localhost.crt
create mode 100644 sdk/python/tests/data/localhost.key
create mode 100644 sdk/python/tests/data/localhost.pem
diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py
index a68f0fe2bc5..4147cc4af28 100644
--- a/sdk/python/feast/client.py
+++ b/sdk/python/feast/client.py
@@ -21,7 +21,6 @@
from collections import OrderedDict
from math import ceil
from typing import Dict, List, Tuple, Union, Optional
-from typing import List
from urllib.parse import urlparse
import fastavro
@@ -29,6 +28,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
+
from feast.core.CoreService_pb2 import (
GetFeastCoreVersionRequest,
ListFeatureSetsResponse,
@@ -48,11 +48,11 @@
from feast.core.FeatureSet_pb2 import FeatureSetStatus
from feast.feature_set import FeatureSet, Entity
from feast.job import Job
-from feast.serving.ServingService_pb2 import FeatureReference
from feast.loaders.abstract_producer import get_producer
from feast.loaders.file import export_source_to_staging_location
from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT
from feast.loaders.ingest import get_feature_row_chunks
+from feast.serving.ServingService_pb2 import FeatureReference
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
from feast.serving.ServingService_pb2 import (
GetOnlineFeaturesRequest,
@@ -69,9 +69,11 @@
GRPC_CONNECTION_TIMEOUT_DEFAULT = 3 # type: int
GRPC_CONNECTION_TIMEOUT_APPLY = 600 # type: int
-FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str
-FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str
-FEAST_PROJECT_ENV_KEY = "FEAST_PROJECT" # type: str
+FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL"
+FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL"
+FEAST_PROJECT_ENV_KEY = "FEAST_PROJECT"
+FEAST_CORE_SECURE_ENV_KEY = "FEAST_CORE_SECURE"
+FEAST_SERVING_SECURE_ENV_KEY = "FEAST_SERVING_SECURE"
BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS = 300
CPU_COUNT = os.cpu_count() # type: int
@@ -82,7 +84,8 @@ class Client:
"""
def __init__(
- self, core_url: str = None, serving_url: str = None, project: str = None
+ self, core_url: str = None, serving_url: str = None, project: str = None,
+ core_secure: bool = None, serving_secure: bool = None
):
"""
The Feast Client should be initialized with at least one service url
@@ -91,10 +94,14 @@ def __init__(
core_url: Feast Core URL. Used to manage features
serving_url: Feast Serving URL. Used to retrieve features
project: Sets the active project. This field is optional.
- """
- self._core_url = core_url
- self._serving_url = serving_url
- self._project = project
+ core_secure: Use client-side SSL/TLS for Core gRPC API
+ serving_secure: Use client-side SSL/TLS for Serving gRPC API
+ """
+ self._core_url: str = core_url
+ self._serving_url: str = serving_url
+ self._project: str = project
+ self._core_secure: bool = core_secure
+ self._serving_secure: bool = serving_secure
self.__core_channel: grpc.Channel = None
self.__serving_channel: grpc.Channel = None
self._core_service_stub: CoreServiceStub = None
@@ -149,6 +156,52 @@ def serving_url(self, value: str):
"""
self._serving_url = value
+ @property
+ def core_secure(self) -> bool:
+ """
+ Retrieve Feast Core client-side SSL/TLS setting
+
+ Returns:
+ Whether client-side SSL/TLS is enabled
+ """
+
+ if self._core_secure is not None:
+ return self._core_secure
+ return os.getenv(FEAST_CORE_SECURE_ENV_KEY, "").lower() is "true"
+
+ @core_secure.setter
+ def core_secure(self, value: bool):
+ """
+ Set the Feast Core client-side SSL/TLS setting
+
+ Args:
+ value: True to enable client-side SSL/TLS
+ """
+ self._core_secure = value
+
+ @property
+ def serving_secure(self) -> bool:
+ """
+ Retrieve Feast Serving client-side SSL/TLS setting
+
+ Returns:
+ Whether client-side SSL/TLS is enabled
+ """
+
+ if self._serving_secure is not None:
+ return self._serving_secure
+ return os.getenv(FEAST_SERVING_SECURE_ENV_KEY, "").lower() is "true"
+
+ @serving_secure.setter
+ def serving_secure(self, value: bool):
+ """
+ Set the Feast Serving client-side SSL/TLS setting
+
+ Args:
+ value: True to enable client-side SSL/TLS
+ """
+ self._serving_secure = value
+
def version(self):
"""
Returns version information from Feast Core and Feast Serving
@@ -185,7 +238,10 @@ def _connect_core(self, skip_if_connected: bool = True):
raise ValueError("Please set Feast Core URL.")
if self.__core_channel is None:
- self.__core_channel = grpc.insecure_channel(self.core_url)
+ if self.core_secure or self.core_url.endswith(":443"):
+ self.__core_channel = grpc.secure_channel(self.core_url, grpc.ssl_channel_credentials())
+ else:
+ self.__core_channel = grpc.insecure_channel(self.core_url)
try:
grpc.channel_ready_future(self.__core_channel).result(
@@ -214,7 +270,10 @@ def _connect_serving(self, skip_if_connected=True):
raise ValueError("Please set Feast Serving URL.")
if self.__serving_channel is None:
- self.__serving_channel = grpc.insecure_channel(self.serving_url)
+ if self.serving_secure or self.serving_url.endswith(":443"):
+ self.__serving_channel = grpc.secure_channel(self.serving_url, grpc.ssl_channel_credentials())
+ else:
+ self.__serving_channel = grpc.insecure_channel(self.serving_url)
try:
grpc.channel_ready_future(self.__serving_channel).result(
diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt
index f3df60a02ec..b0ad509510e 100644
--- a/sdk/python/requirements-ci.txt
+++ b/sdk/python/requirements-ci.txt
@@ -11,6 +11,7 @@ mock==2.0.0
pandas==0.*
protobuf==3.*
pytest
+pytest-lazy-fixture==0.6.3
pytest-mock
pytest-timeout
PyYAML==5.1.2
diff --git a/sdk/python/tests/data/localhost.crt b/sdk/python/tests/data/localhost.crt
new file mode 100644
index 00000000000..1f471506aab
--- /dev/null
+++ b/sdk/python/tests/data/localhost.crt
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC5zCCAc+gAwIBAgIJAKzukpnyuwsVMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
+BAMMCWxvY2FsaG9zdDAgFw0yMDAyMTcxMTE4NDNaGA8zMDE5MDYyMDExMTg0M1ow
+FDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
+CgKCAQEAqoanhiy4EUZjPA/m8IWk50OyTjKAnqZvEW5glqmTHP6lQbfyWQnzj3Ny
+c++4Xn901FO2v07h+7lE3BScjgCX6klsLOHRnWcLX8lQygR6zzO+Oey1yXuCebBA
+yhrsqgTDC/8zoCxe0W3t0vqvE4AJs3tJHq5Y1ba/X9OiKKsDZuMSSsbdd4qVEL6y
+BD8PRNLT/iiD84Kq58GZtOI3fJls8E/bYbvksugcPI3kmlU4Plg3VrVplMl3DcMz
+7BbvQP6jmVqdPtUT7+lL0C5CsNqbdDOIwg09+Gwus+A/g8PerBBd+ZCmdvSa9LYJ
+OmlJszgZPIL9AagXLfuGQvNN2Y6WowIDAQABozowODAUBgNVHREEDTALgglsb2Nh
+bGhvc3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3
+DQEBCwUAA4IBAQAuF1/VeQL73Y1FKrBX4bAb/Rdh2+Dadpi+w1pgEOi3P4udmQ+y
+Xn9GwwLRQmHRLjyCT5KT8lNHdldPdlBamqPGGku449aCAjA/YHVHhcHaXl0MtPGq
+BfKhHYSsvI2sIymlzZIvvIaf04yuJ1g+L0j8Px4Ecor9YwcKDZmpnIXLgdUtUrIQ
+5Omrb4jImX6q8jp6Bjplb4H3o4TqKoa74NLOWUiH5/Rix3Lo8MRoEVbX2GhKk+8n
+0eD3AuyrI1i+ce7zY8qGJKKFHGLDWPA/+006ZIS4j/Hr2FWo07CPFQ4/3gdJ8Erw
+SzgO9vvIhQrBJn2CIH4+P5Cb1ktdobNWW9XK
+-----END CERTIFICATE-----
diff --git a/sdk/python/tests/data/localhost.key b/sdk/python/tests/data/localhost.key
new file mode 100644
index 00000000000..dbd9cda062c
--- /dev/null
+++ b/sdk/python/tests/data/localhost.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCqhqeGLLgRRmM8
+D+bwhaTnQ7JOMoCepm8RbmCWqZMc/qVBt/JZCfOPc3Jz77hef3TUU7a/TuH7uUTc
+FJyOAJfqSWws4dGdZwtfyVDKBHrPM7457LXJe4J5sEDKGuyqBMML/zOgLF7Rbe3S
++q8TgAmze0kerljVtr9f06IoqwNm4xJKxt13ipUQvrIEPw9E0tP+KIPzgqrnwZm0
+4jd8mWzwT9thu+Sy6Bw8jeSaVTg+WDdWtWmUyXcNwzPsFu9A/qOZWp0+1RPv6UvQ
+LkKw2pt0M4jCDT34bC6z4D+Dw96sEF35kKZ29Jr0tgk6aUmzOBk8gv0BqBct+4ZC
+803ZjpajAgMBAAECggEADE4FHphxe8WheX8IQgjSumFXJ29bepc14oMdcyGvXOM/
+F3vnf+dI7Ov+sUD2A9OcoYmc4TcW9WwL/Pl7xn9iduRvatmsn3gFCRdkvf8OwY7R
+Riq/f1drNc6zDiJdO3N2g5IZrpAlE2WkSJoQMg8GJC5cO1uHS3yRWJ/Tzq1wZGcW
+Dot9hAFgN0qNdP0xFkOsPM5ptC3DjLqsZWboJhIM19hgsIYaWQWHvcYlCcWTVhkj
+FYzvLj5GrzAgyE89RpdXus670q5E2R2Rlnja21TfcxK0UOdIrKghZ0jxZMsXEwdB
+8V7kIzL5kh//RhT/dIt0mHNMSdLFFx3yMTb2wTzpWQKBgQDRiCRslDSjiNSFySkn
+6IivAwJtV2gLSxV05D9u9lrrlskHogrZUJkpVF1VzSnwv/ASaCZX4AGTtNPaz+vy
+yDviwfjADsuum8jkzoxKCHnR1HVMyX+vm/g+pE20PMskTUuDE4zROtrqo9Ky0afv
+94mJrf93Q815rsbEM5osugaeBQKBgQDQWAPTKy1wcG7edwfu3EaLYHPZ8pW9MldP
+FvCLTMwSDkSzU+wA4BGE/5Tuu0WHSAfUc5C1LnMQXKBQXun+YCaBR6GZjUAmntz3
+poBIOYaxe651zqzCmo4ip1h5wIfPvynsyGmhsbpDSNhvXFgH2mF3XSY1nduKSRHu
+389cHk3ahwKBgA4gAWSYcRv9I2aJcw7PrDcwGr/IPqlUPHQO1v/h96seFRtAnz6b
+IlgY6dnY5NTn+4UiJEOUREbyz71Weu949CCLNvurg6uXsOlLy0VKYPv2OJoek08B
+UrDWXq6h0of19fs2HC4Wq59Zv+ByJcIVi94OLsSZe4aSc6/SUrhlKgEJAoGBAIvR
+5Y88NNx2uBEYdPx6W+WBr34e7Rrxw+JSFNCHk5SyeqyWr5XOyjMliv/EMl8dmhOc
+Ewtkxte+MeB+Mi8CvBSay/rO7rR8fPK+jOzrnldSF7z8HLjlHGppQFlFOl/TfQFp
+ZmqbadNp+caShImQp0SCAPiOnh1p+F0FWpYJyFnVAoGAKhSRP0iUmd+tId94px2m
+G248BhcM9/0r+Y3yRX1eBx5eBzlzPUPcW1MSbhiZ1DIyLZ/MyObl98A1oNBGun11
+H/7Mq0E8BcJoXmt/6Z+2NhREBV9tDNuINyS/coYBV7H50pnSqyPpREPxNmu3Ukbm
+u7ggLRfH+DexDysbpbCZ9l4=
+-----END PRIVATE KEY-----
diff --git a/sdk/python/tests/data/localhost.pem b/sdk/python/tests/data/localhost.pem
new file mode 100644
index 00000000000..1f471506aab
--- /dev/null
+++ b/sdk/python/tests/data/localhost.pem
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC5zCCAc+gAwIBAgIJAKzukpnyuwsVMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
+BAMMCWxvY2FsaG9zdDAgFw0yMDAyMTcxMTE4NDNaGA8zMDE5MDYyMDExMTg0M1ow
+FDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
+CgKCAQEAqoanhiy4EUZjPA/m8IWk50OyTjKAnqZvEW5glqmTHP6lQbfyWQnzj3Ny
+c++4Xn901FO2v07h+7lE3BScjgCX6klsLOHRnWcLX8lQygR6zzO+Oey1yXuCebBA
+yhrsqgTDC/8zoCxe0W3t0vqvE4AJs3tJHq5Y1ba/X9OiKKsDZuMSSsbdd4qVEL6y
+BD8PRNLT/iiD84Kq58GZtOI3fJls8E/bYbvksugcPI3kmlU4Plg3VrVplMl3DcMz
+7BbvQP6jmVqdPtUT7+lL0C5CsNqbdDOIwg09+Gwus+A/g8PerBBd+ZCmdvSa9LYJ
+OmlJszgZPIL9AagXLfuGQvNN2Y6WowIDAQABozowODAUBgNVHREEDTALgglsb2Nh
+bGhvc3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3
+DQEBCwUAA4IBAQAuF1/VeQL73Y1FKrBX4bAb/Rdh2+Dadpi+w1pgEOi3P4udmQ+y
+Xn9GwwLRQmHRLjyCT5KT8lNHdldPdlBamqPGGku449aCAjA/YHVHhcHaXl0MtPGq
+BfKhHYSsvI2sIymlzZIvvIaf04yuJ1g+L0j8Px4Ecor9YwcKDZmpnIXLgdUtUrIQ
+5Omrb4jImX6q8jp6Bjplb4H3o4TqKoa74NLOWUiH5/Rix3Lo8MRoEVbX2GhKk+8n
+0eD3AuyrI1i+ce7zY8qGJKKFHGLDWPA/+006ZIS4j/Hr2FWo07CPFQ4/3gdJ8Erw
+SzgO9vvIhQrBJn2CIH4+P5Cb1ktdobNWW9XK
+-----END CERTIFICATE-----
diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py
index 123cbe47fd6..2724fff52e3 100644
--- a/sdk/python/tests/test_client.py
+++ b/sdk/python/tests/test_client.py
@@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import pkgutil
from datetime import datetime
import tempfile
+from unittest import mock
+
import grpc
import pandas as pd
from google.protobuf.duration_pb2 import Duration
@@ -63,10 +66,38 @@
CORE_URL = "core.feast.example.com"
SERVING_URL = "serving.example.com"
+_PRIVATE_KEY_RESOURCE_PATH = 'data/localhost.key'
+_CERTIFICATE_CHAIN_RESOURCE_PATH = 'data/localhost.pem'
+_ROOT_CERTIFICATE_RESOURCE_PATH = 'data/localhost.crt'
class TestClient:
- @pytest.fixture(scope="function")
+
+ @pytest.fixture
+ def secure_mock_client(self, mocker):
+ client = Client(core_url=CORE_URL, serving_url=SERVING_URL, core_secure=True, serving_secure=True)
+ mocker.patch.object(client, "_connect_core")
+ mocker.patch.object(client, "_connect_serving")
+ client._core_url = CORE_URL
+ client._serving_url = SERVING_URL
+ return client
+
+ @pytest.fixture
+ def mock_client(self, mocker):
+ client = Client(core_url=CORE_URL, serving_url=SERVING_URL)
+ mocker.patch.object(client, "_connect_core")
+ mocker.patch.object(client, "_connect_serving")
+ client._core_url = CORE_URL
+ client._serving_url = SERVING_URL
+ return client
+
+ @pytest.fixture
+ def server_credentials(self):
+ private_key = pkgutil.get_data(__name__, _PRIVATE_KEY_RESOURCE_PATH)
+ certificate_chain = pkgutil.get_data(__name__, _CERTIFICATE_CHAIN_RESOURCE_PATH)
+ return grpc.ssl_server_credentials(((private_key, certificate_chain),))
+
+ @pytest.fixture
def core_server(self):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Core.add_CoreServiceServicer_to_server(CoreServicer(), server)
@@ -75,7 +106,7 @@ def core_server(self):
yield server
server.stop(0)
- @pytest.fixture(scope="function")
+ @pytest.fixture
def serving_server(self):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Serving.add_ServingServiceServicer_to_server(ServingServicer(), server)
@@ -85,48 +116,73 @@ def serving_server(self):
server.stop(0)
@pytest.fixture
- def mock_client(self, mocker):
- client = Client(core_url=CORE_URL, serving_url=SERVING_URL)
- mocker.patch.object(client, "_connect_core")
- mocker.patch.object(client, "_connect_serving")
- client._core_url = CORE_URL
- client._serving_url = SERVING_URL
- return client
+ def secure_core_server(self, server_credentials):
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ Core.add_CoreServiceServicer_to_server(CoreServicer(), server)
+ server.add_secure_port("[::]:50053", server_credentials)
+ server.start()
+ yield server
+ server.stop(0)
+
+ @pytest.fixture
+ def secure_serving_server(self, server_credentials):
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ Serving.add_ServingServiceServicer_to_server(ServingServicer(), server)
+
+ server.add_secure_port("[::]:50054", server_credentials)
+ server.start()
+ yield server
+ server.stop(0)
+
+ @pytest.fixture
+ def secure_client(self, secure_core_server, secure_serving_server):
+ root_certificate_credentials = pkgutil.get_data(__name__, _ROOT_CERTIFICATE_RESOURCE_PATH)
+ # this is needed to establish a secure connection using self-signed certificates, for the purpose of the test
+ ssl_channel_credentials = grpc.ssl_channel_credentials(root_certificates=root_certificate_credentials)
+ with mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value=ssl_channel_credentials)):
+ yield Client(core_url="localhost:50053", serving_url="localhost:50054", core_secure=True,
+ serving_secure=True)
@pytest.fixture
def client(self, core_server, serving_server):
return Client(core_url="localhost:50051", serving_url="localhost:50052")
- def test_version(self, mock_client, mocker):
- mock_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel(""))
- mock_client._serving_service_stub = Serving.ServingServiceStub(
+ @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"),
+ pytest.lazy_fixture("secure_mock_client")
+ ])
+ def test_version(self, mocked_client, mocker):
+ mocked_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel(""))
+ mocked_client._serving_service_stub = Serving.ServingServiceStub(
grpc.insecure_channel("")
)
mocker.patch.object(
- mock_client._core_service_stub,
+ mocked_client._core_service_stub,
"GetFeastCoreVersion",
return_value=GetFeastCoreVersionResponse(version="0.3.2"),
)
mocker.patch.object(
- mock_client._serving_service_stub,
+ mocked_client._serving_service_stub,
"GetFeastServingInfo",
return_value=GetFeastServingInfoResponse(version="0.3.2"),
)
- status = mock_client.version()
+ status = mocked_client.version()
assert (
- status["core"]["url"] == CORE_URL
- and status["core"]["version"] == "0.3.2"
- and status["serving"]["url"] == SERVING_URL
- and status["serving"]["version"] == "0.3.2"
+ status["core"]["url"] == CORE_URL
+ and status["core"]["version"] == "0.3.2"
+ and status["serving"]["url"] == SERVING_URL
+ and status["serving"]["version"] == "0.3.2"
)
- def test_get_online_features(self, mock_client, mocker):
+ @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"),
+ pytest.lazy_fixture("secure_mock_client")
+ ])
+ def test_get_online_features(self, mocked_client, mocker):
ROW_COUNT = 300
- mock_client._serving_service_stub = Serving.ServingServiceStub(
+ mocked_client._serving_service_stub = Serving.ServingServiceStub(
grpc.insecure_channel("")
)
@@ -148,12 +204,12 @@ def test_get_online_features(self, mock_client, mocker):
)
mocker.patch.object(
- mock_client._serving_service_stub,
+ mocked_client._serving_service_stub,
"GetOnlineFeatures",
return_value=response,
)
- response = mock_client.get_online_features(
+ response = mocked_client.get_online_features(
entity_rows=entity_rows,
feature_refs=[
"my_project/feature_1:1",
@@ -169,17 +225,20 @@ def test_get_online_features(self, mock_client, mocker):
) # type: GetOnlineFeaturesResponse
assert (
- response.field_values[0].fields["my_project/feature_1:1"].int64_val == 1
- and response.field_values[0].fields["my_project/feature_9:1"].int64_val == 9
+ response.field_values[0].fields["my_project/feature_1:1"].int64_val == 1
+ and response.field_values[0].fields["my_project/feature_9:1"].int64_val == 9
)
- def test_get_feature_set(self, mock_client, mocker):
- mock_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel(""))
+ @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"),
+ pytest.lazy_fixture("secure_mock_client")
+ ])
+ def test_get_feature_set(self, mocked_client, mocker):
+ mocked_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel(""))
from google.protobuf.duration_pb2 import Duration
mocker.patch.object(
- mock_client._core_service_stub,
+ mocked_client._core_service_stub,
"GetFeatureSet",
return_value=GetFeatureSetResponse(
feature_set=FeatureSetProto(
@@ -214,29 +273,32 @@ def test_get_feature_set(self, mock_client, mocker):
)
),
)
- mock_client.set_project("my_project")
- feature_set = mock_client.get_feature_set("my_feature_set", version=2)
+ mocked_client.set_project("my_project")
+ feature_set = mocked_client.get_feature_set("my_feature_set", version=2)
assert (
- feature_set.name == "my_feature_set"
- and feature_set.version == 2
- and feature_set.fields["my_feature_1"].name == "my_feature_1"
- and feature_set.fields["my_feature_1"].dtype == ValueType.FLOAT
- and feature_set.fields["my_entity_1"].name == "my_entity_1"
- and feature_set.fields["my_entity_1"].dtype == ValueType.INT64
- and len(feature_set.features) == 2
- and len(feature_set.entities) == 1
+ feature_set.name == "my_feature_set"
+ and feature_set.version == 2
+ and feature_set.fields["my_feature_1"].name == "my_feature_1"
+ and feature_set.fields["my_feature_1"].dtype == ValueType.FLOAT
+ and feature_set.fields["my_entity_1"].name == "my_entity_1"
+ and feature_set.fields["my_entity_1"].dtype == ValueType.INT64
+ and len(feature_set.features) == 2
+ and len(feature_set.entities) == 1
)
- def test_get_batch_features(self, mock_client, mocker):
+ @pytest.mark.parametrize("mocked_client", [pytest.lazy_fixture("mock_client"),
+ pytest.lazy_fixture("secure_mock_client")
+ ])
+ def test_get_batch_features(self, mocked_client, mocker):
- mock_client._serving_service_stub = Serving.ServingServiceStub(
+ mocked_client._serving_service_stub = Serving.ServingServiceStub(
grpc.insecure_channel("")
)
- mock_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel(""))
+ mocked_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel(""))
mocker.patch.object(
- mock_client._core_service_stub,
+ mocked_client._core_service_stub,
"GetFeatureSet",
return_value=GetFeatureSetResponse(
feature_set=FeatureSetProto(
@@ -283,7 +345,7 @@ def test_get_batch_features(self, mock_client, mocker):
to_avro(file_path_or_buffer=final_results, df=expected_dataframe)
mocker.patch.object(
- mock_client._serving_service_stub,
+ mocked_client._serving_service_stub,
"GetBatchFeatures",
return_value=GetBatchFeaturesResponse(
job=BatchFeaturesJob(
@@ -297,7 +359,7 @@ def test_get_batch_features(self, mock_client, mocker):
)
mocker.patch.object(
- mock_client._serving_service_stub,
+ mocked_client._serving_service_stub,
"GetJob",
return_value=GetJobResponse(
job=BatchFeaturesJob(
@@ -311,7 +373,7 @@ def test_get_batch_features(self, mock_client, mocker):
)
mocker.patch.object(
- mock_client._serving_service_stub,
+ mocked_client._serving_service_stub,
"GetFeastServingInfo",
return_value=GetFeastServingInfoResponse(
job_staging_location=f"file://{tempfile.mkdtemp()}/",
@@ -319,8 +381,8 @@ def test_get_batch_features(self, mock_client, mocker):
),
)
- mock_client.set_project("project1")
- response = mock_client.get_batch_features(
+ mocked_client.set_project("project1")
+ response = mocked_client.get_batch_features(
entity_rows=pd.DataFrame(
{
"datetime": [
@@ -348,9 +410,12 @@ def test_get_batch_features(self, mock_client, mocker):
]
)
- def test_apply_feature_set_success(self, client):
+ @pytest.mark.parametrize("test_client", [pytest.lazy_fixture("client"),
+ pytest.lazy_fixture("secure_client")
+ ])
+ def test_apply_feature_set_success(self, test_client):
- client.set_project("project1")
+ test_client.set_project("project1")
# Create Feature Sets
fs1 = FeatureSet("my-feature-set-1")
@@ -364,23 +429,24 @@ def test_apply_feature_set_success(self, client):
fs2.add(Entity(name="fs2-my-entity-1", dtype=ValueType.INT64))
# Register Feature Set with Core
- client.apply(fs1)
- client.apply(fs2)
+ test_client.apply(fs1)
+ test_client.apply(fs2)
- feature_sets = client.list_feature_sets()
+ feature_sets = test_client.list_feature_sets()
# List Feature Sets
assert (
- len(feature_sets) == 2
- and feature_sets[0].name == "my-feature-set-1"
- and feature_sets[0].features[0].name == "fs1-my-feature-1"
- and feature_sets[0].features[0].dtype == ValueType.INT64
- and feature_sets[1].features[1].dtype == ValueType.BYTES_LIST
+ len(feature_sets) == 2
+ and feature_sets[0].name == "my-feature-set-1"
+ and feature_sets[0].features[0].name == "fs1-my-feature-1"
+ and feature_sets[0].features[0].dtype == ValueType.INT64
+ and feature_sets[1].features[1].dtype == ValueType.BYTES_LIST
)
- @pytest.mark.parametrize("dataframe", [dataframes.GOOD])
- def test_feature_set_ingest_success(self, dataframe, client, mocker):
- client.set_project("project1")
+ @pytest.mark.parametrize("dataframe,test_client", [(dataframes.GOOD, pytest.lazy_fixture("client")),
+ (dataframes.GOOD, pytest.lazy_fixture("secure_client"))])
+ def test_feature_set_ingest_success(self, dataframe, test_client, mocker):
+ test_client.set_project("project1")
driver_fs = FeatureSet(
"driver-feature-set", source=KafkaSource(brokers="kafka:9092", topic="test")
)
@@ -390,12 +456,12 @@ def test_feature_set_ingest_success(self, dataframe, client, mocker):
driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64))
# Register with Feast core
- client.apply(driver_fs)
+ test_client.apply(driver_fs)
driver_fs = driver_fs.to_proto()
driver_fs.meta.status = FeatureSetStatusProto.STATUS_READY
mocker.patch.object(
- client._core_service_stub,
+ test_client._core_service_stub,
"GetFeatureSet",
return_value=GetFeatureSetResponse(feature_set=driver_fs),
)
@@ -403,14 +469,16 @@ def test_feature_set_ingest_success(self, dataframe, client, mocker):
# Need to create a mock producer
with patch("feast.client.get_producer") as mocked_queue:
# Ingest data into Feast
- client.ingest("driver-feature-set", dataframe)
+ test_client.ingest("driver-feature-set", dataframe)
- @pytest.mark.parametrize("dataframe,exception", [(dataframes.GOOD, TimeoutError)])
+ @pytest.mark.parametrize("dataframe,exception,test_client",
+ [(dataframes.GOOD, TimeoutError, pytest.lazy_fixture("client")),
+ (dataframes.GOOD, TimeoutError, pytest.lazy_fixture("secure_client"))])
def test_feature_set_ingest_fail_if_pending(
- self, dataframe, exception, client, mocker
+ self, dataframe, exception, test_client, mocker
):
with pytest.raises(exception):
- client.set_project("project1")
+ test_client.set_project("project1")
driver_fs = FeatureSet(
"driver-feature-set",
source=KafkaSource(brokers="kafka:9092", topic="test"),
@@ -421,12 +489,12 @@ def test_feature_set_ingest_fail_if_pending(
driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64))
# Register with Feast core
- client.apply(driver_fs)
+ test_client.apply(driver_fs)
driver_fs = driver_fs.to_proto()
driver_fs.meta.status = FeatureSetStatusProto.STATUS_PENDING
mocker.patch.object(
- client._core_service_stub,
+ test_client._core_service_stub,
"GetFeatureSet",
return_value=GetFeatureSetResponse(feature_set=driver_fs),
)
@@ -434,18 +502,22 @@ def test_feature_set_ingest_fail_if_pending(
# Need to create a mock producer
with patch("feast.client.get_producer") as mocked_queue:
# Ingest data into Feast
- client.ingest("driver-feature-set", dataframe, timeout=1)
+ test_client.ingest("driver-feature-set", dataframe, timeout=1)
@pytest.mark.parametrize(
- "dataframe,exception",
+ "dataframe,exception,test_client",
[
- (dataframes.BAD_NO_DATETIME, Exception),
- (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception),
- (dataframes.BAD_NO_ENTITY, Exception),
- (dataframes.NO_FEATURES, Exception),
+ (dataframes.BAD_NO_DATETIME, Exception, pytest.lazy_fixture("client")),
+ (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, pytest.lazy_fixture("client")),
+ (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("client")),
+ (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("client")),
+ (dataframes.BAD_NO_DATETIME, Exception, pytest.lazy_fixture("secure_client")),
+ (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, pytest.lazy_fixture("secure_client")),
+ (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("secure_client")),
+ (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("secure_client")),
],
)
- def test_feature_set_ingest_failure(self, client, dataframe, exception):
+ def test_feature_set_ingest_failure(self, test_client, dataframe, exception):
with pytest.raises(exception):
# Create feature set
driver_fs = FeatureSet("driver-feature-set")
@@ -454,15 +526,16 @@ def test_feature_set_ingest_failure(self, client, dataframe, exception):
driver_fs.infer_fields_from_df(dataframe)
# Register with Feast core
- client.apply(driver_fs)
+ test_client.apply(driver_fs)
# Ingest data into Feast
- client.ingest(driver_fs, dataframe=dataframe)
+ test_client.ingest(driver_fs, dataframe=dataframe)
- @pytest.mark.parametrize("dataframe", [dataframes.ALL_TYPES])
- def test_feature_set_types_success(self, client, dataframe, mocker):
+ @pytest.mark.parametrize("dataframe,test_client", [(dataframes.ALL_TYPES, pytest.lazy_fixture("client")),
+ (dataframes.ALL_TYPES, pytest.lazy_fixture("secure_client"))])
+ def test_feature_set_types_success(self, test_client, dataframe, mocker):
- client.set_project("project1")
+ test_client.set_project("project1")
all_types_fs = FeatureSet(
name="all_types",
@@ -489,10 +562,10 @@ def test_feature_set_types_success(self, client, dataframe, mocker):
)
# Register with Feast core
- client.apply(all_types_fs)
+ test_client.apply(all_types_fs)
mocker.patch.object(
- client._core_service_stub,
+ test_client._core_service_stub,
"GetFeatureSet",
return_value=GetFeatureSetResponse(feature_set=all_types_fs.to_proto()),
)
@@ -500,4 +573,38 @@ def test_feature_set_types_success(self, client, dataframe, mocker):
# Need to create a mock producer
with patch("feast.client.get_producer") as mocked_queue:
# Ingest data into Feast
- client.ingest(all_types_fs, dataframe)
+ test_client.ingest(all_types_fs, dataframe)
+
+ @patch("grpc.channel_ready_future")
+ def test_secure_channel_creation_with_secure_client(self, _mocked_obj):
+ client = Client(core_url="localhost:50051", serving_url="localhost:50052", serving_secure=True,
+ core_secure=True)
+ with mock.patch("grpc.secure_channel") as _grpc_mock, \
+ mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials:
+ client._connect_serving()
+ _grpc_mock.assert_called_with(client.serving_url, _mocked_credentials.return_value)
+
+ @mock.patch("grpc.channel_ready_future")
+ def test_secure_channel_creation_with_secure_serving_url(self, _mocked_obj, ):
+ client = Client(core_url="localhost:50051", serving_url="localhost:443")
+ with mock.patch("grpc.secure_channel") as _grpc_mock, \
+ mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials:
+ client._connect_serving()
+ _grpc_mock.assert_called_with(client.serving_url, _mocked_credentials.return_value)
+
+ @patch("grpc.channel_ready_future")
+ def test_secure_channel_creation_with_secure_client(self, _mocked_obj):
+ client = Client(core_url="localhost:50053", serving_url="localhost:50054", serving_secure=True,
+ core_secure=True)
+ with mock.patch("grpc.secure_channel") as _grpc_mock, \
+ mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials:
+ client._connect_core()
+ _grpc_mock.assert_called_with(client.core_url, _mocked_credentials.return_value)
+
+ @patch("grpc.channel_ready_future")
+ def test_secure_channel_creation_with_secure_core_url(self, _mocked_obj):
+ client = Client(core_url="localhost:443", serving_url="localhost:50054")
+ with mock.patch("grpc.secure_channel") as _grpc_mock, \
+ mock.patch("grpc.ssl_channel_credentials", MagicMock(return_value="test")) as _mocked_credentials:
+ client._connect_core()
+ _grpc_mock.assert_called_with(client.core_url, _mocked_credentials.return_value)
\ No newline at end of file
From 6508e423a5e0b5e67940b688ceb439f0464b8635 Mon Sep 17 00:00:00 2001
From: David Heryanto
Date: Wed, 26 Feb 2020 09:40:40 +0800
Subject: [PATCH 10/13] Rename metric name for request latency in feast serving
(#488)
So that it is consistent with the actual unit of timing being measured
And recommended metric names in Prometheus https://prometheus.io/docs/practices/naming/#metric-names
(cherry picked from commit 5758d99a7048d166dc011f6a97333e68811c2003)
---
.../main/java/feast/serving/service/RedisServingService.java | 2 +-
serving/src/main/java/feast/serving/util/Metrics.java | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java
index 48fc485214d..24c69b9f796 100644
--- a/serving/src/main/java/feast/serving/service/RedisServingService.java
+++ b/serving/src/main/java/feast/serving/service/RedisServingService.java
@@ -313,7 +313,7 @@ private List sendMultiGet(List keys) {
} finally {
requestLatency
.labels("sendMultiGet")
- .observe((System.currentTimeMillis() - startTime) / 1000);
+ .observe((System.currentTimeMillis() - startTime) / 1000d);
}
}
}
diff --git a/serving/src/main/java/feast/serving/util/Metrics.java b/serving/src/main/java/feast/serving/util/Metrics.java
index 99f6353e742..05546ec384b 100644
--- a/serving/src/main/java/feast/serving/util/Metrics.java
+++ b/serving/src/main/java/feast/serving/util/Metrics.java
@@ -24,9 +24,9 @@ public class Metrics {
public static final Histogram requestLatency =
Histogram.build()
.buckets(0.001, 0.002, 0.004, 0.006, 0.008, 0.01, 0.015, 0.02, 0.025, 0.03, 0.035, 0.05)
- .name("request_latency_ms")
+ .name("request_latency_seconds")
.subsystem("feast_serving")
- .help("Request latency in seconds.")
+ .help("Request latency in seconds")
.labelNames("method")
.register();
From ff6b8bfb26c2839cc7973b84961bad8efbeb177d Mon Sep 17 00:00:00 2001
From: David Heryanto
Date: Wed, 26 Feb 2020 13:43:24 +0800
Subject: [PATCH 11/13] Use Strings trim rather than strip since strip is only
available in java 11
---
.../transform/metrics/WriteFeatureValueMetricsDoFnTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
index 8f0adf40168..7f487601101 100644
--- a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
+++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
@@ -142,7 +142,7 @@ private Map> readTestInput(String path) throws IOEx
}
List colNames = new ArrayList<>();
for (String line : lines) {
- if (line.strip().length() < 1) {
+ if (line.trim().length() < 1) {
continue;
}
String[] splits = line.split(",");
@@ -156,7 +156,7 @@ private Map> readTestInput(String path) throws IOEx
Builder featureRowBuilder = FeatureRow.newBuilder();
for (int i = 0; i < splits.length; i++) {
- String colVal = splits[i].strip();
+ String colVal = splits[i].trim();
if (i == 0) {
featureRowBuilder.setFeatureSet(colVal);
continue;
From e62c5073a8a4d01d71966a3509e68fc5e63e574f Mon Sep 17 00:00:00 2001
From: David Heryanto
Date: Wed, 26 Feb 2020 13:48:21 +0800
Subject: [PATCH 12/13] Maven spotless formatting
---
.../main/java/feast/core/job/dataflow/DataflowJobManager.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
index d80d6547186..56c6f9de5f2 100644
--- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
+++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
@@ -37,7 +37,6 @@
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
From 9dedca92a872808aacb93d12e9630d963aa71913 Mon Sep 17 00:00:00 2001
From: David Heryanto
Date: Wed, 26 Feb 2020 14:14:59 +0800
Subject: [PATCH 13/13] Add changelong for v0.4.6 and update versions in pom
and Helm chart to v0.4.6
---
CHANGELOG.md | 13 +++++++++++++
datatypes/java/README.md | 2 +-
infra/charts/feast/Chart.yaml | 2 +-
infra/charts/feast/README.md | 6 +++---
infra/charts/feast/charts/feast-core/Chart.yaml | 2 +-
infra/charts/feast/charts/feast-serving/Chart.yaml | 2 +-
infra/charts/feast/requirements.yaml | 6 +++---
pom.xml | 2 +-
8 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 401cabb90d9..7758ae3fb97 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,18 @@
# Changelog
+## [v0.4.6](https://github.com/gojek/feast/tree/v0.4.6) (2020-02-26)
+
+[Full Changelog](https://github.com/gojek/feast/compare/v0.4.5...v0.4.6)
+
+**Merged pull requests:**
+- Rename metric name for request latency in feast serving [\#488](https://github.com/gojek/feast/pull/488) ([davidheryanto](https://github.com/davidheryanto))
+- Allow use of secure gRPC in Feast Python client [\#459](https://github.com/gojek/feast/pull/459) ([Yanson](https://github.com/Yanson))
+- Extend WriteMetricsTransform in Ingestion to write feature value stats to StatsD [\#486](https://github.com/gojek/feast/pull/486) ([davidheryanto](https://github.com/davidheryanto))
+- Remove transaction from Ingestion [\#480](https://github.com/gojek/feast/pull/480) ([imjuanleonard](https://github.com/imjuanleonard))
+- Fix fastavro version used in Feast to avoid Timestamp delta error [\#490](https://github.com/gojek/feast/pull/490) ([davidheryanto](https://github.com/davidheryanto))
+- Fail Spotless formatting check before tests execute [\#487](https://github.com/gojek/feast/pull/487) ([ches](https://github.com/ches))
+- Reduce refresh rate of specification refresh in Serving to 10 seconds [\#481](https://github.com/gojek/feast/pull/481) ([woop](https://github.com/woop))
+
## [v0.4.5](https://github.com/gojek/feast/tree/v0.4.5) (2020-02-14)
[Full Changelog](https://github.com/gojek/feast/compare/v0.4.4...v0.4.5)
diff --git a/datatypes/java/README.md b/datatypes/java/README.md
index d5124eabb46..28b693786c8 100644
--- a/datatypes/java/README.md
+++ b/datatypes/java/README.md
@@ -16,7 +16,7 @@ Dependency Coordinates
dev.feast
datatypes-java
- 0.4.5-SNAPSHOT
+ 0.4.6-SNAPSHOT
```
diff --git a/infra/charts/feast/Chart.yaml b/infra/charts/feast/Chart.yaml
index e4ca21aa62f..35a02a7f894 100644
--- a/infra/charts/feast/Chart.yaml
+++ b/infra/charts/feast/Chart.yaml
@@ -1,4 +1,4 @@
apiVersion: v1
description: A Helm chart to install Feast on kubernetes
name: feast
-version: 0.4.5
+version: 0.4.6
diff --git a/infra/charts/feast/README.md b/infra/charts/feast/README.md
index ca526ad0b9f..a49f0132303 100644
--- a/infra/charts/feast/README.md
+++ b/infra/charts/feast/README.md
@@ -85,7 +85,7 @@ The following table lists the configurable parameters of the Feast chart and the
| `feast-core.prometheus-statsd-exporter.*` | Refer to this [link](charts/feast-core/charts/prometheus-statsd-exporter/values.yaml |
| `feast-core.replicaCount` | No of pods to create | `1`
| `feast-core.image.repository` | Repository for Feast Core Docker image | `gcr.io/kf-feast/feast-core`
-| `feast-core.image.tag` | Tag for Feast Core Docker image | `0.4.5`
+| `feast-core.image.tag` | Tag for Feast Core Docker image | `0.4.6`
| `feast-core.image.pullPolicy` | Image pull policy for Feast Core Docker image | `IfNotPresent`
| `feast-core.prometheus.enabled` | Add annotations to enable Prometheus scraping | `false`
| `feast-core.application.yaml` | Configuration for Feast Core application | Refer to this [link](charts/feast-core/values.yaml)
@@ -126,7 +126,7 @@ The following table lists the configurable parameters of the Feast chart and the
| `feast-serving-online.core.enabled` | Flag for Feast Serving to use Feast Core in the same Helm release | `true`
| `feast-serving-online.replicaCount` | No of pods to create | `1`
| `feast-serving-online.image.repository` | Repository for Feast Serving Docker image | `gcr.io/kf-feast/feast-serving`
-| `feast-serving-online.image.tag` | Tag for Feast Serving Docker image | `0.4.5`
+| `feast-serving-online.image.tag` | Tag for Feast Serving Docker image | `0.4.6`
| `feast-serving-online.image.pullPolicy` | Image pull policy for Feast Serving Docker image | `IfNotPresent`
| `feast-serving-online.prometheus.enabled` | Add annotations to enable Prometheus scraping | `true`
| `feast-serving-online.application.yaml` | Application configuration for Feast Serving | Refer to this [link](charts/feast-serving/values.yaml)
@@ -168,7 +168,7 @@ The following table lists the configurable parameters of the Feast chart and the
| `feast-serving-batch.core.enabled` | Flag for Feast Serving to use Feast Core in the same Helm release | `true`
| `feast-serving-batch.replicaCount` | No of pods to create | `1`
| `feast-serving-batch.image.repository` | Repository for Feast Serving Docker image | `gcr.io/kf-feast/feast-serving`
-| `feast-serving-batch.image.tag` | Tag for Feast Serving Docker image | `0.4.5`
+| `feast-serving-batch.image.tag` | Tag for Feast Serving Docker image | `0.4.6`
| `feast-serving-batch.image.pullPolicy` | Image pull policy for Feast Serving Docker image | `IfNotPresent`
| `feast-serving-batch.prometheus.enabled` | Add annotations to enable Prometheus scraping | `true`
| `feast-serving-batch.application.yaml` | Application configuration for Feast Serving | Refer to this [link](charts/feast-serving/values.yaml)
diff --git a/infra/charts/feast/charts/feast-core/Chart.yaml b/infra/charts/feast/charts/feast-core/Chart.yaml
index 28b3297bba9..0f437d2b6c0 100644
--- a/infra/charts/feast/charts/feast-core/Chart.yaml
+++ b/infra/charts/feast/charts/feast-core/Chart.yaml
@@ -1,4 +1,4 @@
apiVersion: v1
description: A Helm chart for core component of Feast
name: feast-core
-version: 0.4.5
+version: 0.4.6
diff --git a/infra/charts/feast/charts/feast-serving/Chart.yaml b/infra/charts/feast/charts/feast-serving/Chart.yaml
index c610474c3e5..d84d3377df2 100644
--- a/infra/charts/feast/charts/feast-serving/Chart.yaml
+++ b/infra/charts/feast/charts/feast-serving/Chart.yaml
@@ -1,4 +1,4 @@
apiVersion: v1
description: A Helm chart for serving component of Feast
name: feast-serving
-version: 0.4.5
+version: 0.4.6
diff --git a/infra/charts/feast/requirements.yaml b/infra/charts/feast/requirements.yaml
index b30635dcdb9..3ed12f08871 100644
--- a/infra/charts/feast/requirements.yaml
+++ b/infra/charts/feast/requirements.yaml
@@ -1,12 +1,12 @@
dependencies:
- name: feast-core
- version: 0.4.5
+ version: 0.4.6
condition: feast-core.enabled
- name: feast-serving
alias: feast-serving-batch
- version: 0.4.5
+ version: 0.4.6
condition: feast-serving-batch.enabled
- name: feast-serving
alias: feast-serving-online
- version: 0.4.5
+ version: 0.4.6
condition: feast-serving-online.enabled
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index df0b4a85bd8..b8310aca91d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
- 0.4.5-SNAPSHOT
+ 0.4.6-SNAPSHOT
https://github.com/gojek/feast
UTF-8