Skip to content

Commit

Permalink
fix: edge case handling in source amplitude
Browse files Browse the repository at this point in the history
  • Loading branch information
a-rampalli authored Sep 21, 2022
2 parents ac5305e + dd21b35 commit 41fc5d0
Showing 1 changed file with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,20 @@ class Events(IncrementalAmplitudeStream):
state_checkpoint_interval = 1000
time_interval = {"days": 3}

# To avoid raising http exceptions by default. (HttpStream class raises http errors by default if set to True)
# Export API(Used to get events data) returns 404 when there is no data which is being propogated to UI as error in sync
@property
def raise_on_http_errors(self) -> bool:
"""
Override if needed. If set to False, allows opting-out of raising HTTP code exception.
"""
return False

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
# Read records func calls this func which raises exception when 404 is sent as response
if response.status_code == 404:
self.logger.warn("No data available for the time range requested.")
return []
state_value = stream_state[self.cursor_field] if stream_state else self._start_date.strftime(self.compare_date_template)
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
Expand Down Expand Up @@ -231,8 +244,12 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
if response_data:
series = list(map(list, zip(*response_data["series"])))
for i, date in enumerate(response_data["xValues"]):
yield {"date": date, "statistics": dict(zip(response_data["seriesLabels"], series[i]))}

try:
yield {"date": date, "statistics": dict(zip(response_data["seriesLabels"], series[i]))}
except (IndexError,KeyError) as e:
#To avoid propogating this error to UI
self.logger.warn(e)

def path(self, **kwargs) -> str:
return f"{self.api_version}/users"

Expand All @@ -245,13 +262,17 @@ class AverageSessionLength(IncrementalAmplitudeStream):

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json().get(self.data_field, [])
if response_data:
if response_data and response_data["series"]:
# From the Amplitude documentation it follows that "series" is an array with one element which is itself
# an array that contains the average session length for each day.
# https://developers.amplitude.com/docs/dashboard-rest-api#returns-2
series = response_data["series"][0]
for i, date in enumerate(response_data["xValues"]):
yield {"date": date, "length": series[i]}
try:
yield {"date": date, "length": series[i]}
except IndexError as e:
#To avoid propogating this error to UI
self.logger.warn(e)

def path(self, **kwargs) -> str:
return f"{self.api_version}/sessions/average"

0 comments on commit 41fc5d0

Please sign in to comment.