Skip to content

Commit

Permalink
adinsights async extraction support query splitting by time ranges de…
Browse files Browse the repository at this point in the history
…finitions
  • Loading branch information
kacurez committed Oct 22, 2024
1 parent 0fda8fa commit 6a83cd3
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 5 deletions.
57 changes: 53 additions & 4 deletions src/keboola/facebook/extractor/query.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
[keboola.docker.config :as docker-config]
[keboola.docker.runtime :as runtime]
[keboola.facebook.api.request :as request]
[keboola.facebook.extractor.query-parser :refer :all]
[keboola.facebook.extractor.output :as output]
[clojure.string :as string]))
[clojure.string :as string])
(:import [java.time LocalDate]
[java.time.format DateTimeFormatter]))
(defn incremental?
"Returns false if the given query is not incremental, true otherwise."
[query]
Expand Down Expand Up @@ -100,15 +103,61 @@
(query-path-posts? query)
(query-need-userinfo? query))))

(defn run-async-insights-query [token out-dir name query version]
(defn make-run-inishgts-query-with-time-range [async-insights-request-fn token id parameters-str version query]
(let [parameters-map (parse-parameters parameters-str)
time-ranges-str (get parameters-map "time_ranges")
date-preset (get parameters-map "date_preset")]
(cond
;; If time_ranges is present, split into daily queries
time-ranges-str
(let [time-ranges (parse-time-ranges time-ranges-str)
expanded-time-ranges (expand-time-ranges time-ranges)
run-query-for-time-range (fn [time-range]
(let [new-parameters (generate-parameters-for-dates parameters-map time-range)]
(async-insights-request-fn token id new-parameters version query)))]
(runtime/log-strings "Splitting query by time_ranges: " {:time_ranges expanded-time-ranges})
(mapcat run-query-for-time-range expanded-time-ranges))
;; If date_preset is one of last_3d, last_7d, last_30d, split into daily queries
(some #{date-preset} ["last_3d" "last_7d" "last_30d"])
(let [days (date-preset-to-days date-preset)
end-date (.format (LocalDate/now) (DateTimeFormatter/ofPattern "yyyy-MM-dd"))
start-date (get-past-date days)
expanded-time-ranges (generate-date-ranges start-date end-date)
run-query-for-time-range (fn [time-range]
(let [new-parameters (generate-parameters-for-dates parameters-map time-range)]
(async-insights-request-fn token id new-parameters version query)))]
(runtime/log-strings "Splitting query by date_preset: " date-preset {:time_ranges expanded-time-ranges})
(mapcat run-query-for-time-range expanded-time-ranges))
;; Else, use the parameters as is
:else
(do
(runtime/log-strings "Running query with parameters as is even though split-query-time-range-by-day = true")
(async-insights-request-fn token id parameters-str version query)))))

;; Runs the async insights query, splitting it into daily queries based on time_ranges or date_preset.
(defn run-async-insights-query-with-splitting [token out-dir name query version]
(let [ids-str (:ids query)
parameters (:parameters query)
run-query (fn [id] (request/async-insights-request token id parameters version query))
ids-seq (s/split ids-str #",")
all-merged-queries-rows (mapcat #(run-query %) ids-seq)
run-query (fn [id] (make-run-inishgts-query-with-time-range request/async-insights-request token id parameters version query))
all-merged-queries-rows (mapcat run-query ids-seq)
all-rows (apply concat all-merged-queries-rows)]
(output/write-rows all-rows out-dir name true (incremental? query))))

;; Runs the async insights query, optionally splitting it into daily queries based on the query configuration.
(defn run-async-insights-query [token out-dir name query version]
(let [split-query? (:split-query-time-range-by-day query)]
(if split-query?
(run-async-insights-query-with-splitting token out-dir name query version)
;; Else, use the parameters as is
(let [ids-str (:ids query)
parameters (:parameters query)
ids-seq (s/split ids-str #",")
run-query (fn [id] (request/async-insights-request token id parameters version query))
all-merged-queries-rows (mapcat run-query ids-seq)
all-rows (apply concat all-merged-queries-rows)]
(output/write-rows all-rows out-dir name true (incremental? query))))))

(defn run-query [query all-ids credentials out-dir]
(runtime/log-strings "Run query:" query)
(let [token (docker-config/get-fb-token credentials)
Expand Down
124 changes: 123 additions & 1 deletion test/keboola/facebook/extractor/query_test.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
(ns keboola.facebook.extractor.query-test
(:require [keboola.facebook.extractor.query :as sut]
[keboola.facebook.extractor.query-parser :as query-parser]
[keboola.test-utils.core :as test-utils]
[clojure.java.io :as io]
[keboola.facebook.api.request-test :refer [media-posted-before-error-response]]
[clojure.test :refer :all])
(:use clj-http.fake))
(:use clj-http.fake)
(:import [java.time LocalDate]
[java.time.format DateTimeFormatter]))

(def ^:dynamic *tmpdir* "")

Expand Down Expand Up @@ -67,3 +70,122 @@
(is (empty-dir? *tmpdir*)))))

(use-fixtures :once setup-tmpdir)



(deftest test-make-run-insights-query-with-time-range
(testing "date_preset=last_3d"
;; Placeholder values for unimportant parameters
(let [token "token"
id "id"
version "version"
query {}
;; Parameters string as per your example
parameters-str "time_increment=1&breakdowns=product_id&date_preset=last_3d"
;; Atom to capture calls to the mock function
async-insights-request-calls (atom [])
;; Mock function to capture calls and parameters
mock-async-insights-request-fn (fn [token id new-parameters version query]
(swap! async-insights-request-calls conj
{:token token
:id id
:parameters new-parameters
:version version
:query query})
;; Return a mock result
[{:data "mock-result"}])]

;; Call the function under test
(let [results (sut/make-run-inishgts-query-with-time-range
mock-async-insights-request-fn
token
id
parameters-str
version
query)
;; Capture the calls made to the mock function
calls @async-insights-request-calls
num-calls (count calls)
;; Compute expected dates based on current date
date-preset "last_3d"
days (query-parser/date-preset-to-days date-preset) ;; Should be 3 for "last_3d"
formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd")
today (LocalDate/now)
start-date (query-parser/get-past-date days)
;; Generate dates including today
dates (map #(.format % formatter)
(query-parser/date-range start-date (.format today formatter)))
expected-time-ranges (map (fn [date]
{:since date
:until date})
dates)]

;; Verify the number of calls made matches expected number of dates
(is (= num-calls (count expected-time-ranges))
(str "Expected " (count expected-time-ranges) " calls to async-insights-request-fn, but got " num-calls))

;; Parse the parameters once for use in generating expected parameters
(let [parameters-map (query-parser/parse-parameters parameters-str)]

;; Verify each call was made with the correct parameters
(doseq [[call expected-time-range] (map vector calls expected-time-ranges)]
(let [expected-parameters (query-parser/generate-parameters-for-dates parameters-map expected-time-range)]
(is (= (:parameters call) expected-parameters)
(str "Expected parameters: " expected-parameters ", but got: " (:parameters call))))))

;; Optionally, verify that results contain the expected data
(is (= results (apply concat (repeat num-calls [{:data "mock-result"}])))
"Expected results to contain the mock data repeated for each call"))))
(testing "time_ranges=[{'since':'2024-08-10','until':'2024-08-12'}]"
;; Placeholder values for unimportant parameters
(let [token "token"
id "id"
version "version"
query {}
;; Parameters string as per your example
parameters-str "time_increment=1&breakdowns=product_id&time_ranges=[{'since':'2024-08-10','until':'2024-08-12'}]"
;; Atom to capture calls to the mock function
async-insights-request-calls (atom [])
;; Mock function to capture calls and parameters
mock-async-insights-request-fn (fn [token id new-parameters version query]
(swap! async-insights-request-calls conj
{:token token
:id id
:parameters new-parameters
:version version
:query query})
;; Return a mock result
[{:data "mock-result"}])]

;; Call the function under test
(let [results (sut/make-run-inishgts-query-with-time-range
mock-async-insights-request-fn
token
id
parameters-str
version
query)
;; Capture the calls made to the mock function
calls @async-insights-request-calls
num-calls (count calls)
;; Compute expected dates based on current date
time-ranges-str "[{'since':'2024-08-10','until':'2024-08-12'}]"
time-ranges (query-parser/parse-time-ranges time-ranges-str)
expected-time-ranges (query-parser/expand-time-ranges time-ranges)]

;; Verify the number of calls made matches expected number of dates
(is (= num-calls (count expected-time-ranges))
(str "Expected " (count expected-time-ranges) " calls to async-insights-request-fn, but got " num-calls))

;; Parse the parameters once for use in generating expected parameters
(let [parameters-map (query-parser/parse-parameters parameters-str)]

;; Verify each call was made with the correct parameters
(doseq [[call expected-time-range] (map vector calls expected-time-ranges)]
(let [expected-parameters (query-parser/generate-parameters-for-dates parameters-map expected-time-range)]
(is (= (:parameters call) expected-parameters)
(str "Expected parameters: " expected-parameters ", but got: " (:parameters call))))))

;; Optionally, verify that results contain the expected data
(is (= results (apply concat (repeat num-calls [{:data "mock-result"}])))
"Expected results to contain the mock data repeated for each call")))))

0 comments on commit 6a83cd3

Please sign in to comment.