From 68e6a783f9941a4a6820b811e372fc7df2b657d9 Mon Sep 17 00:00:00 2001 From: Dainius Jocas Date: Tue, 23 Mar 2021 08:36:45 +0200 Subject: [PATCH] Replay for impact should stop processing when baseline query times out (#5) * feat: stop the RFI if baseline query times out * test: test _rank_eval query construction --- dockerfiles/Dockerfile.executable-builder | 2 +- dockerfiles/Dockerfile.test | 2 +- dockerfiles/docker-compose.es.test.yml | 2 +- dockerfiles/docker-compose.es.yml | 4 +- src/replay/impact.clj | 161 +++++++++++++--------- test/replay/impact_test.clj | 110 +++++++++++++++ 6 files changed, 208 insertions(+), 73 deletions(-) create mode 100644 test/replay/impact_test.clj diff --git a/dockerfiles/Dockerfile.executable-builder b/dockerfiles/Dockerfile.executable-builder index 233a5de..e88ccaf 100644 --- a/dockerfiles/Dockerfile.executable-builder +++ b/dockerfiles/Dockerfile.executable-builder @@ -2,7 +2,7 @@ FROM findepi/graalvm:java11-native as BUILDER ENV GRAALVM_HOME=/graalvm ENV JAVA_HOME=/graalvm -ENV CLOJURE_VERSION=1.10.2.774 +ENV CLOJURE_VERSION=1.10.3.814 RUN apt-get install -y curl \ && gu install native-image \ diff --git a/dockerfiles/Dockerfile.test b/dockerfiles/Dockerfile.test index 2b2b34e..7e401e2 100644 --- a/dockerfiles/Dockerfile.test +++ b/dockerfiles/Dockerfile.test @@ -2,7 +2,7 @@ FROM findepi/graalvm:java11-native as BUILDER ENV GRAALVM_HOME=/graalvm ENV JAVA_HOME=/graalvm -ENV CLOJURE_VERSION=1.10.2.774 +ENV CLOJURE_VERSION=1.10.3.814 RUN apt-get install -y curl \ && gu install native-image \ diff --git a/dockerfiles/docker-compose.es.test.yml b/dockerfiles/docker-compose.es.test.yml index 9298889..2320f30 100644 --- a/dockerfiles/docker-compose.es.test.yml +++ b/dockerfiles/docker-compose.es.test.yml @@ -1,7 +1,7 @@ version: '3' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.10.2} + image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.11.2} environment: - discovery.type=single-node - bootstrap.memory_lock=true diff --git a/dockerfiles/docker-compose.es.yml b/dockerfiles/docker-compose.es.yml index 31775c6..d131bb7 100644 --- a/dockerfiles/docker-compose.es.yml +++ b/dockerfiles/docker-compose.es.yml @@ -1,7 +1,7 @@ version: '3' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.10.2} + image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.11.2} environment: - discovery.type=single-node - bootstrap.memory_lock=true @@ -18,7 +18,7 @@ services: - 9200:9200 kibana: - image: docker.elastic.co/kibana/kibana:${ES_VERSION:-7.10.2} + image: docker.elastic.co/kibana/kibana:${ES_VERSION:-7.11.2} environment: SERVER_NAME: kibana LOGGING_QUIET: 'true' diff --git a/src/replay/impact.clj b/src/replay/impact.clj index e1c65c5..8c0130f 100644 --- a/src/replay/impact.clj +++ b/src/replay/impact.clj @@ -17,77 +17,101 @@ (defn get-index-or-alias [endpoint] (last (re-find #"^/(.*)/_search" endpoint))) -(defn prepare-endpoint [^String endpoint] +(defn prepare-endpoint + "Prepares the endpoint for PIT queries: remove preference, routing, index name." + [^String endpoint] (transform.uri/transform-uri endpoint - [{:match "preference=[^&]*&?" + [{:match "preference=[^&]*&?" ;; Remove preference string :replacement ""} - {:match "routing=[^&]*&?" + {:match "routing=[^&]*&?" ;; Remove routing parameter :replacement ""} - {:match "^(/.*)(/.*)" + {:match "^(/.*)(/.*)" ;; Remove original index because PIT doesn't allow it :replacement "$2"} - {:match "\\?$" + {:match "\\?$" ;; Remove trailing question mark :replacement ""}])) (defn generate-queries [opts query-body] (impact-transform/generate-queries query-body (get-in opts [:replay :query-transforms]))) +(defn get-baseline-resp [^String url query-body pit k] + (r/execute-request + {:url url + :body (assoc query-body :pit pit :size k) + :opts (assoc r/default-exponential-backoff-params :keywordize? true) + :method :get + :headers r/default-headers})) + +(defn get-baseline-ratings [url query-body pit k ignore-timeouts] + (let [baseline-resp (get-baseline-resp url query-body pit k)] + (when (and (:timed_out baseline-resp) (not ignore-timeouts)) + (throw (Exception. (format "Request to get baseline ratings timed-out. %s" baseline-resp)))) + (map (fn [hit] + (assoc (select-keys hit [:_index :_id]) :rating 1)) + (get-in baseline-resp [:hits :hits])))) + +(defn get-grouped-query-variations [query-body opts k] + (->> (generate-queries opts query-body) + (map (fn [qv] (update qv :request assoc :size k))) + (group-by (fn [query-variation] (json/encode (:variation query-variation)))))) + +(defn prepare-rank-eval-request [ratings grouped-variations metric pit] + {:requests (map (fn [[id [{request :request}]]] + {:id id + :request (assoc request :pit pit) + :ratings ratings}) + grouped-variations) + :metric metric}) + +(defn query-rank-eval-api [target-es-host target-index ratings grouped-variations metric pit] + (let [target-url (format "%s/%s/_rank_eval" target-es-host target-index)] + (r/execute-request + {:url target-url + :body (prepare-rank-eval-request ratings grouped-variations metric pit) + :opts (assoc r/default-exponential-backoff-params :keywordize? true) + :method :get + :headers r/default-headers}))) + +(defn construct-rfi-records [rank-eval-resp query-log-entry grouped-variations baseline-ratings k] + (let [{:keys [details failures metric_score]} rank-eval-resp + variation-ids (keys details)] + (map (fn [variation-id] + (let [query-log-entry-id (get query-log-entry :_id) + variation (first (get grouped-variations (name variation-id))) + impact {:top-k k + :variation-id (name variation-id) + :variation (map (fn [variation-map] + (update variation-map :value str)) + (:variation variation)) + :query (json/encode (:request variation)) + :failures (json/encode failures) + :impact (float (- 1 (:metric_score (variation-id details)))) + :average-impact (float (- 1 metric_score)) + :hit-count (count (get-in details [variation-id :hits])) + :unrelated-count (count (get-in details [variation-id :unrated_docs])) + :metric-score (get-in details [variation-id :metric_score]) + :original-hit-count (count baseline-ratings) + :details (json/encode (get details variation-id))}] + (-> query-log-entry + (update :_id (fn [replay-log-entry-id] (str replay-log-entry-id "-" (hash variation-id)))) + (assoc-in [:_source :query_log_entry_id] query-log-entry-id) + (assoc-in [:_source :impact] impact)))) + variation-ids))) + (defn measure-impact [opts query-log-entry] - (let [es-host (get-in opts [:replay :connection.url]) + (let [target-es-host (get-in opts [:replay :connection.url]) raw-endpoint (get-in query-log-entry [:_source :uri]) - endpoint (prepare-endpoint raw-endpoint) - target-index (get-index-or-alias raw-endpoint) - pit (assoc (pit/init es-host target-index opts) :keep_alive "30s") - query-string (get-in query-log-entry [:_source :request]) - query-body (json/decode query-string) - url (format "%s%s" es-host endpoint) + target-index (or (get-in opts [:replay :target-index]) (get-index-or-alias raw-endpoint)) k (get-in opts [:replay :top-k]) - baseline-resp (r/execute-request - {:url url - :body (assoc query-body :pit pit :size k) - :opts (assoc r/default-exponential-backoff-params :keywordize? true) - :method :get - :headers r/default-headers})] - (let [metric {:precision {:k k :relevant_rating_threshold 1 :ignore_unlabeled false}} - ratings (map (fn [hit] (assoc (select-keys hit [:_index :_id]) :rating 1)) - (get-in baseline-resp [:hits :hits])) - target-url (format "%s/%s/_rank_eval" es-host target-index) - query-variations (generate-queries opts query-body) - grouped-variations (group-by (fn [qv] (json/encode (:variation qv))) - (map (fn [qv] (update qv :request assoc :size k)) query-variations)) - rank-eval-resp (r/execute-request - {:url target-url - :body {:requests (map (fn [[id [{request :request}]]] - {:id id - :request (assoc request :pit pit) - :ratings ratings}) - grouped-variations) - :metric metric} - :opts (assoc r/default-exponential-backoff-params :keywordize? true) - :method :get - :headers r/default-headers})] - (let [{:keys [details failures metric_score]} rank-eval-resp] - (map (fn [variation-id] - (let [query-log-entry-id (get query-log-entry :_id) - variation (first (get grouped-variations (name variation-id)))] - (-> query-log-entry - (update :_id (fn [replay-log-entry-id] (str replay-log-entry-id "-" (hash variation-id)))) - (assoc-in [:_source :query_log_entry_id] query-log-entry-id) - (assoc-in [:_source :impact] {:top-k k - :variation-id (name variation-id) - :variation (map (fn [variation-map] - (update variation-map :value str)) - (:variation variation)) - :query (json/encode (:request variation)) - :failures failures - :impact (float (- 1 (:metric_score (variation-id details)))) - :average-impact (float (- 1 metric_score)) - :hit-count (count (get-in details [variation-id :hits])) - :unrelated-count (count (get-in details [variation-id :unrated_docs])) - :metric-score (get-in details [variation-id :metric_score]) - :original-hit-count (count ratings) - :details (json/encode (get details variation-id))})))) - (keys details)))))) + query-body (json/decode (get-in query-log-entry [:_source :request])) + metric {:precision {:k k :relevant_rating_threshold 1 :ignore_unlabeled false}} + pit (assoc (pit/init target-es-host target-index opts) :keep_alive "30s") + baseline-ratings-url (format "%s%s" target-es-host (prepare-endpoint raw-endpoint)) + baseline-ratings (get-baseline-ratings baseline-ratings-url query-body pit k (get-in opts [:replay :ignore-timeouts])) + grouped-variations (get-grouped-query-variations query-body opts k) + rank-eval-resp (query-rank-eval-api target-es-host target-index baseline-ratings grouped-variations metric pit)] + (println baseline-ratings) + (construct-rfi-records rank-eval-resp query-log-entry grouped-variations baseline-ratings k))) (def defaults {:max_docs 1 @@ -102,7 +126,9 @@ :top-k 10 :query-transforms [] :connection.url "http://localhost:9200" - :concurrency 1} + :target-index nil + :concurrency 1 + :ignore-timeouts false} :sink {:connection.url "http://localhost:9200" :dest.index "impact_sink_index" :batch.size 50}}) @@ -126,16 +152,15 @@ (comment (replay.impact/execute - {:max_docs 100 + {:max_docs 1 :source {:remote {:host "http://localhost:9200"} :index "query_logs" :query {:query {:bool {:filter [{:term {:query_from {:value 0}}} - {:term {:stats {:value "some value"}}} {:range {:header.timestamp {:gte "now-2d"}}} {:match {:request "multi_match"}} - {:prefix {:uri.keyword "/index_name/_search"}}] + {:prefix {:uri.keyword "/index-name/_search"}}] :must_not [{:exists {:field "query_sort"}}]}} :sort [{:header.timestamp {:order :asc}}] @@ -144,13 +169,13 @@ :replay {:connection.url "http://localhost:9200" :concurrency 10 :top-k 100 - :query-transforms [{:id "test" - :lang :sci - :script "(fn [query boost]\n (let [query-string (-> query\n (get-in [:query :bool :must])\n first\n (get-in [:constant_score :filter :multi_match :query]))\n clause-to-add {:constant_score {:boost boost\n :filter {:match {:title.folded {:_name \"boost_on_exactness\"\n :operator \"and\"\n :query query-string}}}}}]\n (update-in query [:query :bool :should] conj clause-to-add)))" - :vals [0.00001 0.0001 0.001 0.01 0.1 1 10 100 1000 10000]} + :query-transforms [{:id "jq-test" + :lang :jq + :script ". as [$query, $value] | $query | .size = $value" + :vals [1 10 100]} {:id "test2" - :lang :sci - :script "(fn [query boost] query)" + :lang :js + :script "(query, value) => { query['from'] = value; return query; }" :vals [123]}]} :sink {:connection.url "http://localhost:9200" :dest.index "impact_sink_index" diff --git a/test/replay/impact_test.clj b/test/replay/impact_test.clj new file mode 100644 index 0000000..0800e02 --- /dev/null +++ b/test/replay/impact_test.clj @@ -0,0 +1,110 @@ +(ns replay.impact-test + (:require [clojure.test :refer :all] + [replay.impact :as impact])) + +(deftest index-name-extraction + (let [uri "/index-name/_search?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"] + (is (= "index-name" (impact/get-index-or-alias uri))))) + +(deftest url-transformations + (let [uri "/index-name/_search?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"] + (is (= "/_search" (impact/prepare-endpoint uri)))) + (testing "msearch case" + (let [uri "/_msearch?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"] + (is (= "/_msearch" (impact/prepare-endpoint uri)))))) + +(deftest query-generation + (let [query {:query {:match_all {}}} + opts {:replay + {:query-transforms [{:id "jq-test" + :lang :jq + :script ". as [$query, $value] | $query | .size = $value" + :vals [1 10]}]}} + variations (impact/generate-queries opts query)] + (is (= 2 (count variations))) + (is (= '({:request {:query {:match_all {}} + :size 1} + :variation ({:id "jq-test" + :value 1})} + {:request {:query {:match_all {}} + :size 10} + :variation ({:id "jq-test" + :value 10})}) variations)))) + +(deftest grouped-variations + (let [k 5 + query {:query {:match_all {}}} + opts {:replay + {:query-transforms [{:id "jq-test" + :lang :jq + :script ". as [$query, $value] | $query | .size = $value" + :vals [1 10]}]}} + grouped-variations (impact/get-grouped-query-variations query opts k)] + (is (= 2 (count grouped-variations))) + (is (= {"[{\"id\":\"jq-test\",\"value\":10}]" [{:request {:query {:match_all {}} + :size 5} + :variation '({:id "jq-test" + :value 10})}] + "[{\"id\":\"jq-test\",\"value\":1}]" [{:request {:query {:match_all {}} + :size 5} + :variation '({:id "jq-test" + :value 1})}]} grouped-variations)))) + +(deftest rank-eval-request-construction + (let [k 10 + query {:query {:match_all {}}} + opts {:replay + {:query-transforms [{:id "jq-test" + :lang :jq + :script ". as [$query, $value] | $query | .size = $value" + :vals [1 10]}]}} + ratings '({:_index "index-name", :_id 1, :rating 1} + {:_index "index-name", :_id 2, :rating 1} + {:_index "index-name", :_id 3, :rating 1} + {:_index "index-name", :_id 4, :rating 1} + {:_index "index-name", :_id 5, :rating 1}) + grouped-variations (impact/get-grouped-query-variations query opts k) + metric {:precision {:k k :relevant_rating_threshold 1 :ignore_unlabeled false}} + pit "pit"] + (is (= {:metric {:precision {:ignore_unlabeled false + :k 10 + :relevant_rating_threshold 1}} + :requests (list {:id "[{\"id\":\"jq-test\",\"value\":1}]" + :ratings (list {:_id 1 + :_index "index-name" + :rating 1} + {:_id 2 + :_index "index-name" + :rating 1} + {:_id 3 + :_index "index-name" + :rating 1} + {:_id 4 + :_index "index-name" + :rating 1} + {:_id 5 + :_index "index-name" + :rating 1}) + :request {:pit "pit" + :query {:match_all {}} + :size 10}} + {:id "[{\"id\":\"jq-test\",\"value\":10}]" + :ratings (list {:_id 1 + :_index "index-name" + :rating 1} + {:_id 2 + :_index "index-name" + :rating 1} + {:_id 3 + :_index "index-name" + :rating 1} + {:_id 4 + :_index "index-name" + :rating 1} + {:_id 5 + :_index "index-name" + :rating 1}) + :request {:pit "pit" + :query {:match_all {}} + :size 10}})} + (impact/prepare-rank-eval-request ratings grouped-variations metric pit)))))