Skip to content

Commit 5a4cfdc

Browse files
Improve db exception handling on websocket (#3131)
Co-authored-by: Jonatan Kłosko <jonatanklosko@gmail.com>
1 parent 409586c commit 5a4cfdc

File tree

6 files changed

+152
-10
lines changed

6 files changed

+152
-10
lines changed

config/test.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,5 @@ config :livebook,
4545
}}
4646

4747
config :livebook, Livebook.Apps.Manager, retry_backoff_base_ms: 0
48+
49+
config :livebook, teams_connection_backoff_range_ms: 0..0

lib/livebook/hubs/team_client.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,12 @@ defmodule Livebook.Hubs.TeamClient do
398398
{:noreply, %{state | connected?: false, connection_status: reason}}
399399
end
400400

401+
def handle_info({:service_unavailable, reason}, state) do
402+
Hubs.Broadcasts.hub_connection_failed(state.hub.id, reason)
403+
404+
{:noreply, %{state | connected?: false, connection_status: reason}}
405+
end
406+
401407
def handle_info({:server_error, reason}, state) do
402408
Hubs.Broadcasts.hub_server_error(state.hub.id, "#{state.hub.hub_name}: #{reason}")
403409
:ok = Hubs.delete_hub(state.hub.id)

lib/livebook/teams/connection.ex

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,26 @@ defmodule Livebook.Teams.Connection do
5050
send(data.listener, {:connection_error, reason})
5151
Logger.warning("Teams WebSocket connection - transport error: #{inspect(reason)}")
5252

53-
# Random between 3 and 10 seconds
54-
backoff = Enum.random(3..10) * 1000
55-
{:keep_state_and_data, {{:timeout, :backoff}, backoff, nil}}
53+
{:keep_state_and_data, {{:timeout, :backoff}, backoff_ms(), nil}}
5654

57-
{:server_error, error} ->
58-
reason = LivebookProto.Error.decode(error).details
55+
{:server_error, 503, body} ->
56+
reason = decode_error_reason(body)
57+
send(data.listener, {:service_unavailable, reason})
58+
59+
Logger.warning(
60+
"Teams WebSocket connection - server error - http status 503: #{inspect(reason)}"
61+
)
62+
63+
{:keep_state_and_data, {{:timeout, :backoff}, backoff_ms(), nil}}
64+
65+
{:server_error, status, body} ->
66+
reason = decode_error_reason(body)
5967
send(data.listener, {:server_error, reason})
60-
Logger.warning("Teams WebSocket connection - server error: #{inspect(reason)}")
68+
69+
Logger.warning(
70+
"Teams WebSocket connection - server error - http status #{status}: #{inspect(reason)}"
71+
)
72+
6173
{:keep_state, data}
6274
end
6375
end
@@ -152,4 +164,17 @@ defmodule Livebook.Teams.Connection do
152164
defp ensure_closed(data) do
153165
_ = WebSocket.disconnect(data.http_conn, data.websocket, data.ref)
154166
end
167+
168+
defp decode_error_reason(body) do
169+
LivebookProto.Error.decode(body).details
170+
rescue
171+
error in Protobuf.DecodeError ->
172+
"Server error (unexpected response format), error: #{Exception.message(error)}"
173+
end
174+
175+
defp backoff_ms do
176+
# Random between 3 and 10 seconds
177+
range = Application.get_env(:livebook, :teams_connection_backoff_range_ms, 3_000..10_000)
178+
Enum.random(range)
179+
end
155180
end

lib/livebook/teams/web_socket.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ defmodule Livebook.Teams.WebSocket do
1616
@spec connect(list({String.t(), String.t()})) ::
1717
{:ok, conn(), websocket(), ref()}
1818
| {:transport_error, String.t()}
19-
| {:server_error, String.t()}
19+
| {:server_error, integer(), String.t()}
2020
def connect(headers \\ []) do
2121
uri = URI.parse(Livebook.Config.teams_url())
2222
{http_scheme, ws_scheme} = parse_scheme(uri)
@@ -77,9 +77,9 @@ defmodule Livebook.Teams.WebSocket do
7777
%{body: []} ->
7878
handle_upgrade_responses(responses, conn, ref, state)
7979

80-
%{status: _} ->
80+
%{status: status} ->
8181
Mint.HTTP.close(conn)
82-
{:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
82+
{:server_error, status, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
8383
end
8484
end
8585

@@ -98,7 +98,7 @@ defmodule Livebook.Teams.WebSocket do
9898

9999
{:error, conn, %UpgradeFailureError{}} ->
100100
Mint.HTTP.close(conn)
101-
{:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
101+
{:server_error, state.status, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
102102

103103
{:error, conn, exception} ->
104104
Mint.HTTP.close(conn)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
defmodule Livebook.Teams.ConnectionTest do
2+
use ExUnit.Case, async: false
3+
4+
alias Livebook.Teams.Connection
5+
6+
@moduletag :capture_log
7+
8+
setup do
9+
original_url = Application.get_env(:livebook, :teams_url)
10+
11+
on_exit(fn ->
12+
Application.put_env(:livebook, :teams_url, original_url)
13+
end)
14+
15+
:ok
16+
end
17+
18+
describe "server error handling" do
19+
test "handles invalid protobuf without crashing" do
20+
html_error = """
21+
<!DOCTYPE html>
22+
<html><body><h1>500 Internal Server Error</h1></body></html>
23+
"""
24+
25+
start_error_server(500, html_error, "text/html")
26+
27+
{:ok, conn_pid} = Connection.start_link(self(), [{"x-test", "true"}])
28+
29+
assert_receive {:server_error, error_message}, 5000
30+
assert error_message =~ "Server error (unexpected response format)"
31+
assert Process.alive?(conn_pid)
32+
end
33+
34+
test "sends :service_unavailable on 503 and retries with backoff" do
35+
error = %LivebookProto.Error{details: "Service temporarily unavailable"}
36+
encoded = LivebookProto.Error.encode(error)
37+
38+
start_error_server(503, encoded, "application/octet-stream")
39+
40+
{:ok, conn_pid} = Connection.start_link(self(), [{"x-test", "true"}])
41+
42+
# First attempt
43+
assert_receive {:service_unavailable, "Service temporarily unavailable"}, 1000
44+
assert Process.alive?(conn_pid)
45+
46+
# Retry after backoff
47+
assert_receive {:service_unavailable, _}, 1000
48+
assert Process.alive?(conn_pid)
49+
end
50+
end
51+
52+
defp start_error_server(status_code, body, content_type) do
53+
port = get_free_port()
54+
Application.put_env(:livebook, :teams_url, "http://localhost:#{port}")
55+
56+
plug =
57+
{__MODULE__.ErrorPlug, status_code: status_code, body: body, content_type: content_type}
58+
59+
start_supervised!({Bandit, plug: plug, port: port, startup_log: false})
60+
end
61+
62+
defp get_free_port do
63+
{:ok, socket} = :gen_tcp.listen(0, [])
64+
{:ok, port} = :inet.port(socket)
65+
:gen_tcp.close(socket)
66+
port
67+
end
68+
69+
defmodule ErrorPlug do
70+
@behaviour Plug
71+
72+
@impl true
73+
def init(opts), do: opts
74+
75+
@impl true
76+
def call(conn, opts) do
77+
status_code = Keyword.fetch!(opts, :status_code)
78+
body = Keyword.fetch!(opts, :body)
79+
content_type = Keyword.fetch!(opts, :content_type)
80+
81+
conn
82+
|> Plug.Conn.put_resp_content_type(content_type)
83+
|> Plug.Conn.send_resp(status_code, body)
84+
end
85+
end
86+
end

test/livebook_teams/hubs/team_client_test.exs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,29 @@ defmodule Livebook.Hubs.TeamClientTest do
3939
start_supervised!({TeamClient, team})
4040
assert_receive {:hub_server_error, ^id, ^error}
4141
end
42+
43+
test "handles service unavailable without deleting hub", %{team: team} do
44+
id = team.id
45+
reason = "Service temporarily unavailable. Please try again."
46+
47+
assert {:ok, pid} = TeamClient.start_link(team)
48+
assert_receive {:hub_connected, ^id}
49+
assert_receive {:client_connected, ^id}
50+
51+
# Simulate receiving a service_unavailable message from the Connection process
52+
send(pid, {:service_unavailable, reason})
53+
54+
# Should broadcast hub_connection_failed
55+
assert_receive {:hub_connection_failed, ^id, ^reason}
56+
57+
# Hub should NOT be deleted (unlike server_error which deletes the hub)
58+
refute_receive {:hub_deleted, ^id}
59+
60+
# TeamClient should still be running
61+
assert Process.alive?(pid)
62+
63+
TeamClient.stop(id)
64+
end
4265
end
4366

4467
describe "handle user_connected event" do

0 commit comments

Comments
 (0)