diff --git a/internal/backend/watchdog_roundtriper.go b/internal/backend/watchdog_roundtriper.go index fb7863002..e3e10d7fe 100644 --- a/internal/backend/watchdog_roundtriper.go +++ b/internal/backend/watchdog_roundtriper.go @@ -2,11 +2,16 @@ package backend import ( "context" + "errors" + "fmt" "io" "net/http" + "sync/atomic" "time" ) +var errRequestTimeout = fmt.Errorf("request timeout") + // watchdogRoundtripper cancels an http request if an upload or download did not make progress // within timeout. The time between fully sending the request and receiving an response is also // limited by this timeout. This ensures that stuck requests are cancelled after some time. @@ -32,12 +37,14 @@ func newWatchdogRoundtripper(rt http.RoundTripper, timeout time.Duration, chunkS func (w *watchdogRoundtripper) RoundTrip(req *http.Request) (*http.Response, error) { timer := time.NewTimer(w.timeout) ctx, cancel := context.WithCancel(req.Context()) + timedOut := &atomic.Bool{} // cancel context if timer expires go func() { defer timer.Stop() select { case <-timer.C: + timedOut.Store(true) cancel() case <-ctx.Done(): } @@ -46,11 +53,14 @@ func (w *watchdogRoundtripper) RoundTrip(req *http.Request) (*http.Response, err kick := func() { timer.Reset(w.timeout) } + isTimeout := func(err error) bool { + return timedOut.Load() && errors.Is(err, context.Canceled) + } req = req.Clone(ctx) if req.Body != nil { // kick watchdog timer as long as uploading makes progress - req.Body = newWatchdogReadCloser(req.Body, w.chunkSize, kick, nil) + req.Body = newWatchdogReadCloser(req.Body, w.chunkSize, kick, nil, isTimeout) } resp, err := w.rt.RoundTrip(req) @@ -60,16 +70,17 @@ func (w *watchdogRoundtripper) RoundTrip(req *http.Request) (*http.Response, err // kick watchdog timer as long as downloading makes progress // cancel context to stop goroutine once response body is closed - resp.Body = newWatchdogReadCloser(resp.Body, w.chunkSize, kick, cancel) + resp.Body = newWatchdogReadCloser(resp.Body, w.chunkSize, kick, cancel, isTimeout) return resp, nil } -func newWatchdogReadCloser(rc io.ReadCloser, chunkSize int, kick func(), close func()) *watchdogReadCloser { +func newWatchdogReadCloser(rc io.ReadCloser, chunkSize int, kick func(), close func(), isTimeout func(err error) bool) *watchdogReadCloser { return &watchdogReadCloser{ rc: rc, chunkSize: chunkSize, kick: kick, close: close, + isTimeout: isTimeout, } } @@ -78,6 +89,7 @@ type watchdogReadCloser struct { chunkSize int kick func() close func() + isTimeout func(err error) bool } var _ io.ReadCloser = &watchdogReadCloser{} @@ -93,6 +105,9 @@ func (w *watchdogReadCloser) Read(p []byte) (n int, err error) { n, err = w.rc.Read(p) w.kick() + if err != nil && w.isTimeout(err) { + err = errRequestTimeout + } return n, err } diff --git a/internal/backend/watchdog_roundtriper_test.go b/internal/backend/watchdog_roundtriper_test.go index b1f589bc0..bc43447e1 100644 --- a/internal/backend/watchdog_roundtriper_test.go +++ b/internal/backend/watchdog_roundtriper_test.go @@ -23,8 +23,11 @@ func TestRead(t *testing.T) { onClose := func() { closed = true } + isTimeout := func(err error) bool { + return false + } - wd := newWatchdogReadCloser(io.NopCloser(bytes.NewReader(data)), 1, kick, onClose) + wd := newWatchdogReadCloser(io.NopCloser(bytes.NewReader(data)), 1, kick, onClose, isTimeout) out, err := io.ReadAll(wd) rtest.OK(t, err) @@ -196,6 +199,6 @@ func TestDownloadTimeout(t *testing.T) { rtest.Equals(t, 200, resp.StatusCode, "unexpected status code") _, err = io.ReadAll(resp.Body) - rtest.Equals(t, context.Canceled, err, "response download not canceled") + rtest.Equals(t, errRequestTimeout, err, "response download not canceled") rtest.OK(t, resp.Body.Close()) }