Skip to content

Commit

Permalink
Show infinitely looping files in observability dashboard (#155)
Browse files Browse the repository at this point in the history
* add problem blobs back to ingestion events query

* format IngestionEvents with prettier

* dashboard shows specific message for blobs in infinite loop

* show status 'infinite loop', looping files appear when 'errors only' is toggled
  • Loading branch information
zekehuntergreen authored Oct 9, 2023
1 parent fbbb7c5 commit 257da86
Show file tree
Hide file tree
Showing 4 changed files with 448 additions and 212 deletions.
3 changes: 2 additions & 1 deletion backend/app/services/observability/Models.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ case class BlobStatus(
eventStatuses: List[IngestionEventStatus],
extractorStatuses: List[ExtractorStatus],
errors: List[IngestionErrorsWithEventType],
mimeTypes: Option[String])
mimeTypes: Option[String],
infiniteLoop: Boolean)
object BlobStatus {
implicit val dateWrites = JodaReadWrites.dateWrites
implicit val dateReads = JodaReadWrites.dateReads
Expand Down
63 changes: 42 additions & 21 deletions backend/app/services/observability/PostgresClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class PostgresClientImpl(postgresConfig: PostgresConfig) extends PostgresClient
ie.errors,
ie.workspace_name AS "workspaceName",
ie.mime_types AS "mimeTypes",
ie.infinite_loop AS "infiniteLoop",
ARRAY_AGG(DISTINCT blob_metadata.path ) AS paths,
(ARRAY_AGG(blob_metadata.file_size))[1] as "fileSize",
ARRAY_REMOVE(ARRAY_AGG(extractor_statuses.extractor), NULL) AS extractors,
Expand All @@ -167,52 +168,72 @@ class PostgresClientImpl(postgresConfig: PostgresConfig) extends PostgresClient
SELECT
blob_id,
ingest_id,
MIN(EXTRACT(EPOCH from event_time)) AS ingest_start,
MAX(EXTRACT(EPOCH from event_time)) AS most_recent_event,
MIN(EXTRACT(EPOCH FROM event_time)) AS ingest_start,
MAX(EXTRACT(EPOCH FROM event_time)) AS most_recent_event,
ARRAY_AGG(type) as event_types,
ARRAY_AGG(EXTRACT(EPOCH from event_time)) as event_times,
ARRAY_AGG(status) as event_statuses,
ARRAY_AGG(details -> 'errors') as errors,
(ARRAY_AGG(details ->> 'workspaceName') FILTER (WHERE details ->> 'workspaceName' IS NOT NULL))[1] as workspace_name,
(ARRAY_AGG(details ->> 'mimeTypes') FILTER (WHERE details ->> 'mimeTypes' IS NOT NULL))[1] as mime_types
(ARRAY_AGG(details ->> 'mimeTypes') FILTER (WHERE details ->> 'mimeTypes' IS NOT NULL))[1] as mime_types,
FALSE AS infinite_loop
FROM ingestion_events
WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId}
AND blob_id NOT IN (SELECT blob_id FROM problem_blobs)
GROUP BY 1,2
UNION
-- blobs in the ingestion that are failing in an infinite loop
SELECT DISTINCT
blob_id,
ingest_id,
MIN(EXTRACT(EPOCH FROM event_time)) AS ingest_start,
MAX(EXTRACT(EPOCH FROM event_time)) AS most_recent_event,
array[]::text[] AS event_types,
array[]::numeric[] AS event_times,
array[]::text[] AS event_statuses,
array['[]'::jsonb] AS errors,
NULL AS workspace_name,
NULL AS mime_types,
TRUE AS infinite_loop
FROM ingestion_events
WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId}
AND blob_id IN (SELECT blob_id FROM problem_blobs)
GROUP BY 1,2
) AS ie
LEFT JOIN blob_metadata USING(ingest_id, blob_id)
LEFT JOIN extractor_statuses on extractor_statuses.blob_id = ie.blob_id and extractor_statuses.ingest_id = ie.ingest_id
GROUP BY 1,2,3,4,5,6,7,8,9,10
GROUP BY 1,2,3,4,5,6,7,8,9,10,11
ORDER by ingest_start desc
""".map(rs => {
val eventTypes = rs.array("event_types").getArray.asInstanceOf[Array[String]]
BlobStatus(
EventMetadata(
rs.string("blob_id"),
rs.string("ingest_id")
),
BlobStatus.parsePathsArray(rs.array("paths").getArray().asInstanceOf[Array[String]]),
rs.longOpt("fileSize"),
rs.stringOpt("workspaceName"),
EventMetadata(
rs.string("blob_id"),
rs.string("ingest_id")
),
BlobStatus.parsePathsArray(rs.array("paths").getArray().asInstanceOf[Array[String]]),
rs.longOpt("fileSize"),
rs.stringOpt("workspaceName"),
PostgresHelpers.postgresEpochToDateTime(rs.double("ingest_start")),
PostgresHelpers.postgresEpochToDateTime(rs.double("most_recent_event")),
IngestionEventStatus.parseEventStatus(
rs.array("event_times").getArray.asInstanceOf[Array[java.math.BigDecimal]].map(t =>PostgresHelpers.postgresEpochToDateTime(t.doubleValue)),
eventTypes,
rs.array("event_statuses").getArray.asInstanceOf[Array[String]]),
rs.arrayOpt("extractors").map { extractors =>
ExtractorStatus.parseDbStatusEvents(
extractors.getArray().asInstanceOf[Array[String]],
rs.array("extractorEventTimes").getArray().asInstanceOf[Array[String]],
rs.array("extractorStatuses").getArray().asInstanceOf[Array[String]]
)
}.getOrElse(List()),
rs.array("event_statuses").getArray.asInstanceOf[Array[String]]
),
rs.arrayOpt("extractors").map { extractors =>
ExtractorStatus.parseDbStatusEvents(
extractors.getArray().asInstanceOf[Array[String]],
rs.array("extractorEventTimes").getArray().asInstanceOf[Array[String]],
rs.array("extractorStatuses").getArray().asInstanceOf[Array[String]]
)
}.getOrElse(List()),
IngestionError.parseIngestionErrors(
rs.array("errors").getArray.asInstanceOf[Array[String]],
eventTypes
),
rs.stringOpt("mimeTypes")

rs.stringOpt("mimeTypes"),
rs.boolean("infiniteLoop")
)
}
).list().apply()
Expand Down
Loading

0 comments on commit 257da86

Please sign in to comment.