Skip to content

Commit

Permalink
Merge pull request #232 from coryodaniel/fix-watch-objects-with-message
Browse files Browse the repository at this point in the history
make sure objects with message fields are processed
  • Loading branch information
mruoss authored Mar 2, 2023
2 parents 5fd45cc + d78558f commit afeac1b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- ### Added | Changed | Deprecated | Removed | Fixed | Security -->

### Fixed

- Watcher reset resource version for objects with a "message" field. - [#232](https://github.com/coryodaniel/k8s/issues/232), [#231](https://github.com/coryodaniel/k8s/issues/231)

<!--------------------- Don't add new entries after this line --------------------->

## [2.1.0] - 2023-02-25
Expand Down
36 changes: 19 additions & 17 deletions lib/k8s/client/runner/stream/watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,6 @@ defmodule K8s.Client.Runner.Stream.Watch do
{[], new_state}
end

defp process_object(%{"object" => %{"message" => message} = object}, state) do
Logger.error(
log_prefix(
"Erronous event received from watcher: #{message} - resetting the resource version"
),
library: :k8s,
object: object
)

new_state = struct!(state, resource_version: nil)
{[], new_state}
end

defp process_object(%{"type" => "BOOKMARK", "object" => object}, state) do
Logger.debug(
log_prefix("Bookmark received"),
Expand All @@ -192,17 +179,32 @@ defmodule K8s.Client.Runner.Stream.Watch do

defp process_object(
%{"object" => %{"metadata" => %{"resourceVersion" => resource_version}}},
state
)
when resource_version == state.resource_version do
%{resource_version: resource_version} = state
) do
# resource version already obeserved.
{[], state}
end

defp process_object(%{"object" => object} = new_event, state) do
defp process_object(%{"object" => %{"kind" => _} = object} = new_event, state) do
# Emit new event
new_state = struct!(state, resource_version: object["metadata"]["resourceVersion"])
{[new_event], new_state}
end

defp process_object(%{"object" => %{"message" => message} = object}, state) do
# Objects with only the "message" field but no "kind" are cosidered errors.
Logger.error(
log_prefix(
"Erronous event received from watcher: #{message} - resetting the resource version"
),
library: :k8s,
object: object
)

new_state = struct!(state, resource_version: nil)
{[], new_state}
end

@spec get_resource_version(K8s.Conn.t(), K8s.Operation.t()) :: {:ok, binary} | Base.error_t()
defp get_resource_version(%K8s.Conn{} = conn, %K8s.Operation{} = operation) do
with {:ok, payload} <- Base.run(conn, operation) do
Expand Down
13 changes: 11 additions & 2 deletions test/k8s/client/runner/stream/watch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ defmodule K8s.Client.Runner.Stream.WatchTest do
"metadata" => %{"resourceVersion" => "11"}
}
}),
HTTPTestHelper.stream_object(%{
"type" => "MODIFIED",
"object" => %{
"apiVersion" => "apps/v1",
"kind" => "ReplicaSet",
"message" => "some message"
}
}),
HTTPTestHelper.stream_object(%{
"type" => "ERROR",
"object" => %{
Expand Down Expand Up @@ -535,10 +543,11 @@ defmodule K8s.Client.Runner.Stream.WatchTest do

events =
stream
|> Stream.take(4)
|> Stream.take(6)
|> Enum.to_list()

assert ["ADDED", "DELETED", "ADDED", "DELETED"] == Enum.map(events, & &1["type"])
assert ["ADDED", "MODIFIED", "DELETED", "ADDED", "MODIFIED", "DELETED"] ==
Enum.map(events, & &1["type"])
end

assert capture_log(test) =~ "Erronous event received"
Expand Down

0 comments on commit afeac1b

Please sign in to comment.