Skip to content

Commit

Permalink
Merge pull request #123 from keboola/kacurez-granular-by-day-extracti…
Browse files Browse the repository at this point in the history
…on-ST-2348

async insights extraction: support split query by day
  • Loading branch information
kacurez authored Oct 25, 2024
2 parents 4a302bc + 6a83cd3 commit 6f78fb6
Show file tree
Hide file tree
Showing 4 changed files with 455 additions and 5 deletions.
57 changes: 53 additions & 4 deletions src/keboola/facebook/extractor/query.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
[keboola.docker.config :as docker-config]
[keboola.docker.runtime :as runtime]
[keboola.facebook.api.request :as request]
[keboola.facebook.extractor.query-parser :refer :all]
[keboola.facebook.extractor.output :as output]
[clojure.string :as string]))
[clojure.string :as string])
(:import [java.time LocalDate]
[java.time.format DateTimeFormatter]))
(defn incremental?
"Returns false if the given query is not incremental, true otherwise."
[query]
Expand Down Expand Up @@ -100,15 +103,61 @@
(query-path-posts? query)
(query-need-userinfo? query))))

(defn run-async-insights-query [token out-dir name query version]
(defn make-run-inishgts-query-with-time-range [async-insights-request-fn token id parameters-str version query]
(let [parameters-map (parse-parameters parameters-str)
time-ranges-str (get parameters-map "time_ranges")
date-preset (get parameters-map "date_preset")]
(cond
;; If time_ranges is present, split into daily queries
time-ranges-str
(let [time-ranges (parse-time-ranges time-ranges-str)
expanded-time-ranges (expand-time-ranges time-ranges)
run-query-for-time-range (fn [time-range]
(let [new-parameters (generate-parameters-for-dates parameters-map time-range)]
(async-insights-request-fn token id new-parameters version query)))]
(runtime/log-strings "Splitting query by time_ranges: " {:time_ranges expanded-time-ranges})
(mapcat run-query-for-time-range expanded-time-ranges))
;; If date_preset is one of last_3d, last_7d, last_30d, split into daily queries
(some #{date-preset} ["last_3d" "last_7d" "last_30d"])
(let [days (date-preset-to-days date-preset)
end-date (.format (LocalDate/now) (DateTimeFormatter/ofPattern "yyyy-MM-dd"))
start-date (get-past-date days)
expanded-time-ranges (generate-date-ranges start-date end-date)
run-query-for-time-range (fn [time-range]
(let [new-parameters (generate-parameters-for-dates parameters-map time-range)]
(async-insights-request-fn token id new-parameters version query)))]
(runtime/log-strings "Splitting query by date_preset: " date-preset {:time_ranges expanded-time-ranges})
(mapcat run-query-for-time-range expanded-time-ranges))
;; Else, use the parameters as is
:else
(do
(runtime/log-strings "Running query with parameters as is even though split-query-time-range-by-day = true")
(async-insights-request-fn token id parameters-str version query)))))

;; Runs the async insights query, splitting it into daily queries based on time_ranges or date_preset.
(defn run-async-insights-query-with-splitting [token out-dir name query version]
(let [ids-str (:ids query)
parameters (:parameters query)
run-query (fn [id] (request/async-insights-request token id parameters version query))
ids-seq (s/split ids-str #",")
all-merged-queries-rows (mapcat #(run-query %) ids-seq)
run-query (fn [id] (make-run-inishgts-query-with-time-range request/async-insights-request token id parameters version query))
all-merged-queries-rows (mapcat run-query ids-seq)
all-rows (apply concat all-merged-queries-rows)]
(output/write-rows all-rows out-dir name true (incremental? query))))

;; Runs the async insights query, optionally splitting it into daily queries based on the query configuration.
(defn run-async-insights-query [token out-dir name query version]
(let [split-query? (:split-query-time-range-by-day query)]
(if split-query?
(run-async-insights-query-with-splitting token out-dir name query version)
;; Else, use the parameters as is
(let [ids-str (:ids query)
parameters (:parameters query)
ids-seq (s/split ids-str #",")
run-query (fn [id] (request/async-insights-request token id parameters version query))
all-merged-queries-rows (mapcat run-query ids-seq)
all-rows (apply concat all-merged-queries-rows)]
(output/write-rows all-rows out-dir name true (incremental? query))))))

(defn run-query [query all-ids credentials out-dir]
(runtime/log-strings "Run query:" query)
(let [token (docker-config/get-fb-token credentials)
Expand Down
85 changes: 85 additions & 0 deletions src/keboola/facebook/extractor/query_parser.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
(ns keboola.facebook.extractor.query-parser
(:gen-class)
(:require
[clojure.string :as s]
[clojure.data.json :as json])
(:import [java.time LocalDate]
[java.time.format DateTimeFormatter]))


;; Parses the parameters string into a map.
(defn parse-parameters [params-str]
(if (empty? params-str)
{}
(->> (s/split params-str #"&")
(map #(s/split % #"=" 2))
(map (fn [[k v]] [k v]))
(into {}))))

;; Parses the time_ranges parameter into a vector of maps.
(defn parse-time-ranges [time-ranges-str]
(if (empty? time-ranges-str)
[]
(let [json-str (s/replace time-ranges-str #"'" "\"")]
(json/read-str json-str :key-fn keyword))))

;; Generates a sequence of dates from start-date to end-date exclusive
(defn date-range [start-date end-date]
(let [formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd")
start (LocalDate/parse start-date formatter)
end (LocalDate/parse end-date formatter)]
(cond
;; If start date is before end date, return dates up to but not including end date
(.isBefore start end)
(take-while (fn [date] (.isBefore date end))
(iterate #(.plusDays % 1) start))
;; If start date equals end date, return a sequence containing that date
(.isEqual start end)
[start]
;; If start date is after end date, return an empty sequence
:else
[])))

;; Generates a list of time ranges for each day within the given date range.
(defn generate-date-ranges [start-date end-date]

(let [dates (date-range start-date end-date)]
(map (fn [date]
{:since (.toString date)
:until (.toString date)})
dates)))

;; Expands all time ranges into individual day ranges.
(defn expand-time-ranges [time-ranges]
(mapcat (fn [time-range]
(generate-date-ranges (:since time-range) (:until time-range)))
time-ranges))

;; Generates a new parameters string with the updated time_range.
(defn generate-parameters-for-dates [parameters-map time-range]
(let [time-ranges-json (json/write-str [time-range] :value-fn (fn [k v] (if (string? v) v (str v))))
;; Replace double quotes with single quotes
time-ranges-json-single-quotes (s/replace time-ranges-json #"\"" "'")
params-with-new-time-range (-> parameters-map
(dissoc "date_preset")
(assoc "time_ranges" time-ranges-json-single-quotes))]
(->> params-with-new-time-range
(map (fn [[k v]] (str k "=" v)))
(s/join "&"))))

;; Converts date_preset values to the number of days.
(defn date-preset-to-days [date-preset]
(case date-preset
"last_3d" 3
"last_7d" 7
"last_30d" 30
nil))

;;"Returns the date string for 'days' days ago from the given reference date.
;; If reference-date is not provided, uses LocalDate/now."
(defn get-past-date
([days]
(get-past-date days (LocalDate/now)))
([days reference-date]
(let [formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd")]
(.format (.minusDays reference-date days) formatter))))
194 changes: 194 additions & 0 deletions test/keboola/facebook/extractor/query_parser_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
(ns keboola.facebook.extractor.query-parser-test
(:require [clojure.test :refer :all]
[keboola.facebook.extractor.query-parser :as sut])
(:import [java.time LocalDate]
[java.time.format DateTimeParseException]
[java.time.format DateTimeFormatter]))


(deftest test-parse-parameters
(testing "parse-parameters function"
;; Test with multiple parameters
(is (= (sut/parse-parameters "param1=value1&param2=value2")
{"param1" "value1", "param2" "value2"}))
(is (= (sut/parse-parameters "time_ranges=value1&date_preset=last_3d")
{"time_ranges" "value1", "date_preset" "last_3d"}))
;; ;; Test with a single parameter
(is (= (sut/parse-parameters "param1=value1")
{"param1" "value1"}))
;; ;; Test with empty string
(is (= {}
(sut/parse-parameters "")))
;; Test with parameter without value
(is (= (sut/parse-parameters "param1=&param2=value2")
{"param1" "", "param2" "value2"}))
;; Test with duplicate parameter names
(is (= (sut/parse-parameters "param1=value1&param1=value2")
{"param1" "value2"})) ;; The last value should overwrite
;; Test with value containing equal sign
(is (= (sut/parse-parameters "param1=value1=value2")
{"param1" "value1=value2"}))
;; Test with parameter without equal sign
(is (= (sut/parse-parameters "param1")
{"param1" nil}))
;; Test with parameters having empty keys or values
(is (= (sut/parse-parameters "=value1&param2=")
{"" "value1", "param2" ""}))))


(deftest test-parse-time-ranges
(testing "parse-time-ranges function"
;; Test with single time range
(is (= [{:since "2024-08-10", :until "2024-08-13"}]
(sut/parse-time-ranges "[{'since':'2024-08-10','until':'2024-08-13'}]")))
;; Test with multiple time ranges
(is (= [{:since "2024-08-10", :until "2024-08-10"}
{:since "2024-08-11", :until "2024-08-11"}
{:since "2024-08-12", :until "2024-08-12"}
{:since "2024-08-13", :until "2024-08-13"}]
(sut/parse-time-ranges "[{'since':'2024-08-10','until':'2024-08-10'},{'since':'2024-08-11','until':'2024-08-11'},{'since':'2024-08-12','until':'2024-08-12'},{'since':'2024-08-13','until':'2024-08-13'}]")))
;; Test with empty string
(is (= []
(sut/parse-time-ranges "")))
;; Test with invalid JSON (should throw an exception)
(is (thrown? Exception
(sut/parse-time-ranges "[{'since':'2024-08-10','until':'2024-08-13'")))
;; Test with nil input (should return nil)
(is (= []
(sut/parse-time-ranges nil)))))



(deftest test-date-range
(testing "date-range function"
;; Test with start and end dates being the same
(is (= [(LocalDate/parse "2024-08-10")]
(sut/date-range "2024-08-10" "2024-08-10")))
;; Test with start date before end date
(is (= [(LocalDate/parse "2024-08-10")
(LocalDate/parse "2024-08-11")
(LocalDate/parse "2024-08-12")]
(sut/date-range "2024-08-10" "2024-08-13")))
;; Test with start date after end date (should return an empty sequence)
(is (= []
(sut/date-range "2024-08-13" "2024-08-10")))
;; Test with invalid date format (should throw an exception)
(is (thrown? DateTimeParseException
(sut/date-range "2024/08/10" "2024/08-13")))
;; Test with nil inputs (should throw an exception)
(is (thrown? NullPointerException
(sut/date-range nil "2024-08-13")))
(is (thrown? NullPointerException
(sut/date-range "2024-08-10" nil)))
;; Test with large date range
(is (= (map #(LocalDate/parse %)
["2024-01-01" "2024-01-02" "2024-01-03" "2024-01-04"])
(sut/date-range "2024-01-01" "2024-01-05")))))

(deftest test-expand-time-ranges
(testing "expand-time-ranges function"
;; Test with a single time range
(let [input [{:since "2024-08-10" :until "2024-08-13"}]
expected [{:since "2024-08-10" :until "2024-08-10"}
{:since "2024-08-11" :until "2024-08-11"}
{:since "2024-08-12" :until "2024-08-12"}]]
(is (= expected (sut/expand-time-ranges input))))
;; Test with multiple time ranges
(let [input [{:since "2024-08-10" :until "2024-08-11"}
{:since "2024-08-13" :until "2024-08-14"}]
expected [{:since "2024-08-10" :until "2024-08-10"}
{:since "2024-08-13" :until "2024-08-13"}]]
(is (= expected (sut/expand-time-ranges input))))
;; Test with start date equal to end date
(let [input [{:since "2024-08-10" :until "2024-08-10"}]
expected [{:since "2024-08-10" :until "2024-08-10"}]]
(is (= expected (sut/expand-time-ranges input))))
;; Test with start date after end date (should return empty list)
(let [input [{:since "2024-08-13" :until "2024-08-10"}]
expected []]
(is (= expected (sut/expand-time-ranges input))))
;; Test with empty input
(let [input []
expected []]
(is (= expected (sut/expand-time-ranges input))))))


(deftest test-generate-parameters-for-dates
(testing "generate-parameters-for-dates function with single quotes"
;; Test with basic parameters and time-range
(let [parameters-map {"time_increment" "1"
"breakdowns" "product_id"
"date_preset" "last_7d"}
time-range {:since "2024-08-10" :until "2024-08-10"}
expected "time_increment=1&breakdowns=product_id&time_ranges=[{'since':'2024-08-10','until':'2024-08-10'}]"]
(is (= expected (sut/generate-parameters-for-dates parameters-map time-range))))

;; Test with parameters containing existing time_ranges (should replace time_ranges)
(let [parameters-map {"time_increment" "1"
"breakdowns" "product_id"
"time_ranges" "[{'since':'2024-08-10','until':'2024-08-13'}]"}
time-range {:since "2024-08-10" :until "2024-08-13"}
expected "time_increment=1&breakdowns=product_id&time_ranges=[{'since':'2024-08-10','until':'2024-08-13'}]"]
(is (= expected (sut/generate-parameters-for-dates parameters-map time-range))))

;; Test with parameters without date_preset
(let [parameters-map {"time_increment" "1"
"breakdowns" "product_id"}
time-range {:since "2024-08-10" :until "2024-08-10"}
expected "time_increment=1&breakdowns=product_id&time_ranges=[{'since':'2024-08-10','until':'2024-08-10'}]"]
(is (= expected (sut/generate-parameters-for-dates parameters-map time-range))))

;; Test with parameters containing date_preset and other parameters
(let [parameters-map {"time_increment" "1"
"breakdowns" "product_id"
"date_preset" "last_3d"
"level" "ad"}
time-range {:since "2024-08-10" :until "2024-08-12"}
expected "time_increment=1&breakdowns=product_id&level=ad&time_ranges=[{'since':'2024-08-10','until':'2024-08-12'}]"]
(is (= expected (sut/generate-parameters-for-dates parameters-map time-range))))

;; Test with empty parameters-map
(let [parameters-map {}
time-range {:since "2024-08-10" :until "2024-08-10"}
expected "time_ranges=[{'since':'2024-08-10','until':'2024-08-10'}]"]
(is (= expected (sut/generate-parameters-for-dates parameters-map time-range))))))


(deftest test-date-preset-to-days
(testing "date-preset-to-days function"
;; Test with known date_preset values
(is (= 3 (sut/date-preset-to-days "last_3d")))
(is (= 7 (sut/date-preset-to-days "last_7d")))
(is (= 30 (sut/date-preset-to-days "last_30d")))
;; Test with unknown date_preset value
(is (= nil (sut/date-preset-to-days "last_5d")))
;; Test with nil input
(is (= nil (sut/date-preset-to-days nil)))
;; Test with empty string
(is (= nil (sut/date-preset-to-days "")))))


(deftest test-get-past-date
(testing "get-past-date function with reference-date parameter"
(let [formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd")
reference-date (LocalDate/of 2024 10 22)] ; Fixed reference date: 2024-10-22
;; Test with 0 days ago (should return the reference date)
(is (= "2024-10-22" (sut/get-past-date 0 reference-date)))
;; Test with 1 day ago
(is (= "2024-10-21" (sut/get-past-date 1 reference-date)))
;; Test with 7 days ago
(is (= "2024-10-15" (sut/get-past-date 7 reference-date)))
;; Test with 3 days ago
(is (= "2024-10-19" (sut/get-past-date 3 reference-date)))
;; Test with negative days (future date)
(is (= "2024-10-25" (sut/get-past-date -3 reference-date)))
;; Test with large number of days
(is (= "2023-10-22" (sut/get-past-date 366 reference-date))))

(testing "get-past-date function without reference-date (uses LocalDate/now)"
;; Since LocalDate/now is used, this test is date-sensitive.
;; We'll test that the function returns the same date as calculated here.
(let [days 5
formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd")
expected-date (.format (.minusDays (LocalDate/now) days) formatter)]
(is (= expected-date (sut/get-past-date days)))))))
Loading

0 comments on commit 6f78fb6

Please sign in to comment.