diff --git a/internal/model/feed.go b/internal/model/feed.go index 5273eea2..20788eb2 100644 --- a/internal/model/feed.go +++ b/internal/model/feed.go @@ -108,7 +108,7 @@ func (f *Feed) CheckedNow() { } // ScheduleNextCheck set "next_check_at" of a feed based on the scheduler selected from the configuration. -func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int) { +func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int, rateLimited bool) { f.TTL = newTTL // Default to the global config Polling Frequency. var intervalMinutes int @@ -124,6 +124,9 @@ func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int) { default: intervalMinutes = config.Opts.SchedulerRoundRobinMinInterval() } + if rateLimited { + intervalMinutes += (12 * 60) + } // If the feed has a TTL defined, we use it to make sure we don't check it too often. if newTTL > intervalMinutes && newTTL > 0 { intervalMinutes = newTTL diff --git a/internal/model/feed_test.go b/internal/model/feed_test.go index df5c6885..f37bda33 100644 --- a/internal/model/feed_test.go +++ b/internal/model/feed_test.go @@ -15,6 +15,7 @@ import ( const ( largeWeeklyCount = 10080 noNewTTL = 0 + noRateLimited = false ) func TestFeedCategorySetter(t *testing.T) { @@ -89,7 +90,7 @@ func TestFeedScheduleNextCheckDefault(t *testing.T) { timeBefore := time.Now() feed := &Feed{} weeklyCount := 10 - feed.ScheduleNextCheck(weeklyCount, noNewTTL) + feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -115,7 +116,7 @@ func TestFeedScheduleNextCheckRoundRobinMinInterval(t *testing.T) { timeBefore := time.Now() feed := &Feed{} weeklyCount := 100 - feed.ScheduleNextCheck(weeklyCount, noNewTTL) + feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -144,7 +145,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMaxInterval(t *testing.T) { feed := &Feed{} // Use a very small weekly count to trigger the max interval weeklyCount := 1 - feed.ScheduleNextCheck(weeklyCount, noNewTTL) + feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -173,7 +174,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMaxIntervalZeroWeeklyCount(t *testin feed := &Feed{} // Use a very small weekly count to trigger the max interval weeklyCount := 0 - feed.ScheduleNextCheck(weeklyCount, noNewTTL) + feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -202,7 +203,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMinInterval(t *testing.T) { feed := &Feed{} // Use a very large weekly count to trigger the min interval weeklyCount := largeWeeklyCount - feed.ScheduleNextCheck(weeklyCount, noNewTTL) + feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -228,7 +229,7 @@ func TestFeedScheduleNextCheckEntryFrequencyFactor(t *testing.T) { timeBefore := time.Now() feed := &Feed{} weeklyCount := 7 - feed.ScheduleNextCheck(weeklyCount, noNewTTL) + feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -260,7 +261,7 @@ func TestFeedScheduleNextCheckEntryFrequencySmallNewTTL(t *testing.T) { weeklyCount := largeWeeklyCount // TTL is smaller than minInterval. newTTL := minInterval / 2 - feed.ScheduleNextCheck(weeklyCount, newTTL) + feed.ScheduleNextCheck(weeklyCount, newTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -296,7 +297,7 @@ func TestFeedScheduleNextCheckEntryFrequencyLargeNewTTL(t *testing.T) { weeklyCount := largeWeeklyCount // TTL is larger than minInterval. newTTL := minInterval * 2 - feed.ScheduleNextCheck(weeklyCount, newTTL) + feed.ScheduleNextCheck(weeklyCount, newTTL, noRateLimited) if feed.NextCheckAt.IsZero() { t.Error(`The next_check_at must be set`) @@ -309,3 +310,25 @@ func TestFeedScheduleNextCheckEntryFrequencyLargeNewTTL(t *testing.T) { t.Error(`The next_check_at should be after timeBefore + entry frequency min interval`) } } + +func TestFeedScheduleNextCheckRateLimited(t *testing.T) { + var err error + parser := config.NewParser() + config.Opts, err = parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing failure: %v`, err) + } + + feed := &Feed{} + weeklyCount := 10 + rateLimited := true + feed.ScheduleNextCheck(weeklyCount, noNewTTL, rateLimited) + + if feed.NextCheckAt.IsZero() { + t.Error(`The next_check_at must be set`) + } + + if feed.NextCheckAt.Before(time.Now().Add(time.Minute * time.Duration(60*12))) { + t.Error(`The next_check_at should not be before the now + 12 hours`) + } +} diff --git a/internal/reader/fetcher/response_handler.go b/internal/reader/fetcher/response_handler.go index 1de3b384..94287b3c 100644 --- a/internal/reader/fetcher/response_handler.go +++ b/internal/reader/fetcher/response_handler.go @@ -67,6 +67,10 @@ func (r *ResponseHandler) IsModified(lastEtagValue, lastModifiedValue string) bo return true } +func (r *ResponseHandler) IsRateLimited() bool { + return r.httpResponse.StatusCode == http.StatusTooManyRequests +} + func (r *ResponseHandler) Close() { if r.httpResponse != nil && r.httpResponse.Body != nil && r.clientErr == nil { r.httpResponse.Body.Close() diff --git a/internal/reader/handler/handler.go b/internal/reader/handler/handler.go index 2663b3b5..effc864d 100644 --- a/internal/reader/handler/handler.go +++ b/internal/reader/handler/handler.go @@ -198,12 +198,13 @@ func CreateFeed(store *storage.Storage, userID int64, feedCreationRequest *model } // RefreshFeed refreshes a feed. -func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool) *locale.LocalizedErrorWrapper { +func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool) (localizedError *locale.LocalizedErrorWrapper) { slog.Debug("Begin feed refresh process", slog.Int64("user_id", userID), slog.Int64("feed_id", feedID), slog.Bool("force_refresh", forceRefresh), ) + localizedError = nil user, storeErr := store.UserByID(userID) if storeErr != nil { @@ -221,6 +222,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool weeklyEntryCount := 0 newTTL := 0 + rateLimited := false if config.Opts.PollingScheduler() == model.SchedulerEntryFrequency { var weeklyCountErr error weeklyEntryCount, weeklyCountErr = store.WeeklyFeedEntryCount(userID, feedID) @@ -230,7 +232,31 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool } originalFeed.CheckedNow() - originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL) + // Commit the result to the database at the end of this function. + // If we met an error before entering the defer function, localizedError would not be nil. + defer func() { + originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL, rateLimited) + slog.Debug("Updated next check date", + slog.Int64("user_id", userID), + slog.Int64("feed_id", feedID), + slog.Int("weeklyEntryCount", weeklyEntryCount), + slog.Int("ttl", newTTL), + slog.Bool("rateLimited", rateLimited), + slog.Time("new_next_check_at", originalFeed.NextCheckAt), + ) + if localizedError == nil { + // We have not encountered any error before entering this delay function. + originalFeed.ResetErrorCounter() + if storeErr := store.UpdateFeed(originalFeed); storeErr != nil { + // Update the return value when there is an error. + localizedError = locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr) + } + } + if localizedError != nil { + originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language)) + store.UpdateFeedError(originalFeed) + } + }() requestBuilder := fetcher.NewRequestBuilder() requestBuilder.WithUsernameAndPassword(originalFeed.Username, originalFeed.Password) @@ -251,17 +277,18 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool responseHandler := fetcher.NewResponseHandler(requestBuilder.ExecuteRequest(originalFeed.FeedURL)) defer responseHandler.Close() - if localizedError := responseHandler.LocalizedError(); localizedError != nil { + rateLimited = responseHandler.IsRateLimited() + if rateLimited { + slog.Warn("Feed is rate limited (429 status code)", slog.String("feed_url", originalFeed.FeedURL)) + } + + if localizedError = responseHandler.LocalizedError(); localizedError != nil { slog.Warn("Unable to fetch feed", slog.String("feed_url", originalFeed.FeedURL), slog.Any("error", localizedError.Error())) - originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language)) - store.UpdateFeedError(originalFeed) return localizedError } if store.AnotherFeedURLExists(userID, originalFeed.ID, responseHandler.EffectiveURL()) { - localizedError := locale.NewLocalizedErrorWrapper(ErrDuplicatedFeed, "error.duplicated_feed") - originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language)) - store.UpdateFeedError(originalFeed) + localizedError = locale.NewLocalizedErrorWrapper(ErrDuplicatedFeed, "error.duplicated_feed") return localizedError } @@ -279,27 +306,16 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool updatedFeed, parseErr := parser.ParseFeed(responseHandler.EffectiveURL(), bytes.NewReader(responseBody)) if parseErr != nil { - localizedError := locale.NewLocalizedErrorWrapper(parseErr, "error.unable_to_parse_feed", parseErr) - if errors.Is(parseErr, parser.ErrFeedFormatNotDetected) { localizedError = locale.NewLocalizedErrorWrapper(parseErr, "error.feed_format_not_detected", parseErr) + } else { + localizedError = locale.NewLocalizedErrorWrapper(parseErr, "error.unable_to_parse_feed", parseErr) } - - originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language)) - store.UpdateFeedError(originalFeed) return localizedError } // If the feed has a TTL defined, we use it to make sure we don't check it too often. newTTL = updatedFeed.TTL - // Set the next check at with updated arguments. - originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL) - slog.Debug("Updated next check date", - slog.Int64("user_id", userID), - slog.Int64("feed_id", feedID), - slog.Int("ttl", newTTL), - slog.Time("new_next_check_at", originalFeed.NextCheckAt), - ) originalFeed.Entries = updatedFeed.Entries processor.ProcessFeedEntries(store, originalFeed, user, forceRefresh) @@ -308,9 +324,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool updateExistingEntries := forceRefresh || !originalFeed.Crawler newEntries, storeErr := store.RefreshFeedEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, updateExistingEntries) if storeErr != nil { - localizedError := locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr) - originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language)) - store.UpdateFeedError(originalFeed) + localizedError = locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr) return localizedError } @@ -344,16 +358,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool ) } - originalFeed.ResetErrorCounter() - - if storeErr := store.UpdateFeed(originalFeed); storeErr != nil { - localizedError := locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr) - originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language)) - store.UpdateFeedError(originalFeed) - return localizedError - } - - return nil + return localizedError } func checkFeedIcon(store *storage.Storage, requestBuilder *fetcher.RequestBuilder, feedID int64, websiteURL, feedIconURL string) {