This commit is contained in:
Shizun Ge 2024-04-19 23:19:44 -07:00 committed by GitHub
commit bd522095e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 78 additions and 43 deletions

View File

@ -108,7 +108,7 @@ func (f *Feed) CheckedNow() {
} }
// ScheduleNextCheck set "next_check_at" of a feed based on the scheduler selected from the configuration. // ScheduleNextCheck set "next_check_at" of a feed based on the scheduler selected from the configuration.
func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int) { func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int, rateLimited bool) {
f.TTL = newTTL f.TTL = newTTL
// Default to the global config Polling Frequency. // Default to the global config Polling Frequency.
var intervalMinutes int var intervalMinutes int
@ -124,6 +124,9 @@ func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int) {
default: default:
intervalMinutes = config.Opts.SchedulerRoundRobinMinInterval() intervalMinutes = config.Opts.SchedulerRoundRobinMinInterval()
} }
if rateLimited {
intervalMinutes += (12 * 60)
}
// If the feed has a TTL defined, we use it to make sure we don't check it too often. // If the feed has a TTL defined, we use it to make sure we don't check it too often.
if newTTL > intervalMinutes && newTTL > 0 { if newTTL > intervalMinutes && newTTL > 0 {
intervalMinutes = newTTL intervalMinutes = newTTL

View File

@ -15,6 +15,7 @@ import (
const ( const (
largeWeeklyCount = 10080 largeWeeklyCount = 10080
noNewTTL = 0 noNewTTL = 0
noRateLimited = false
) )
func TestFeedCategorySetter(t *testing.T) { func TestFeedCategorySetter(t *testing.T) {
@ -89,7 +90,7 @@ func TestFeedScheduleNextCheckDefault(t *testing.T) {
timeBefore := time.Now() timeBefore := time.Now()
feed := &Feed{} feed := &Feed{}
weeklyCount := 10 weeklyCount := 10
feed.ScheduleNextCheck(weeklyCount, noNewTTL) feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -115,7 +116,7 @@ func TestFeedScheduleNextCheckRoundRobinMinInterval(t *testing.T) {
timeBefore := time.Now() timeBefore := time.Now()
feed := &Feed{} feed := &Feed{}
weeklyCount := 100 weeklyCount := 100
feed.ScheduleNextCheck(weeklyCount, noNewTTL) feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -144,7 +145,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMaxInterval(t *testing.T) {
feed := &Feed{} feed := &Feed{}
// Use a very small weekly count to trigger the max interval // Use a very small weekly count to trigger the max interval
weeklyCount := 1 weeklyCount := 1
feed.ScheduleNextCheck(weeklyCount, noNewTTL) feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -173,7 +174,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMaxIntervalZeroWeeklyCount(t *testin
feed := &Feed{} feed := &Feed{}
// Use a very small weekly count to trigger the max interval // Use a very small weekly count to trigger the max interval
weeklyCount := 0 weeklyCount := 0
feed.ScheduleNextCheck(weeklyCount, noNewTTL) feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -202,7 +203,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMinInterval(t *testing.T) {
feed := &Feed{} feed := &Feed{}
// Use a very large weekly count to trigger the min interval // Use a very large weekly count to trigger the min interval
weeklyCount := largeWeeklyCount weeklyCount := largeWeeklyCount
feed.ScheduleNextCheck(weeklyCount, noNewTTL) feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -228,7 +229,7 @@ func TestFeedScheduleNextCheckEntryFrequencyFactor(t *testing.T) {
timeBefore := time.Now() timeBefore := time.Now()
feed := &Feed{} feed := &Feed{}
weeklyCount := 7 weeklyCount := 7
feed.ScheduleNextCheck(weeklyCount, noNewTTL) feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -260,7 +261,7 @@ func TestFeedScheduleNextCheckEntryFrequencySmallNewTTL(t *testing.T) {
weeklyCount := largeWeeklyCount weeklyCount := largeWeeklyCount
// TTL is smaller than minInterval. // TTL is smaller than minInterval.
newTTL := minInterval / 2 newTTL := minInterval / 2
feed.ScheduleNextCheck(weeklyCount, newTTL) feed.ScheduleNextCheck(weeklyCount, newTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -296,7 +297,7 @@ func TestFeedScheduleNextCheckEntryFrequencyLargeNewTTL(t *testing.T) {
weeklyCount := largeWeeklyCount weeklyCount := largeWeeklyCount
// TTL is larger than minInterval. // TTL is larger than minInterval.
newTTL := minInterval * 2 newTTL := minInterval * 2
feed.ScheduleNextCheck(weeklyCount, newTTL) feed.ScheduleNextCheck(weeklyCount, newTTL, noRateLimited)
if feed.NextCheckAt.IsZero() { if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`) t.Error(`The next_check_at must be set`)
@ -309,3 +310,25 @@ func TestFeedScheduleNextCheckEntryFrequencyLargeNewTTL(t *testing.T) {
t.Error(`The next_check_at should be after timeBefore + entry frequency min interval`) t.Error(`The next_check_at should be after timeBefore + entry frequency min interval`)
} }
} }
func TestFeedScheduleNextCheckRateLimited(t *testing.T) {
var err error
parser := config.NewParser()
config.Opts, err = parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
feed := &Feed{}
weeklyCount := 10
rateLimited := true
feed.ScheduleNextCheck(weeklyCount, noNewTTL, rateLimited)
if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
}
if feed.NextCheckAt.Before(time.Now().Add(time.Minute * time.Duration(60*12))) {
t.Error(`The next_check_at should not be before the now + 12 hours`)
}
}

View File

@ -67,6 +67,10 @@ func (r *ResponseHandler) IsModified(lastEtagValue, lastModifiedValue string) bo
return true return true
} }
func (r *ResponseHandler) IsRateLimited() bool {
return r.httpResponse.StatusCode == http.StatusTooManyRequests
}
func (r *ResponseHandler) Close() { func (r *ResponseHandler) Close() {
if r.httpResponse != nil && r.httpResponse.Body != nil && r.clientErr == nil { if r.httpResponse != nil && r.httpResponse.Body != nil && r.clientErr == nil {
r.httpResponse.Body.Close() r.httpResponse.Body.Close()

View File

@ -198,12 +198,13 @@ func CreateFeed(store *storage.Storage, userID int64, feedCreationRequest *model
} }
// RefreshFeed refreshes a feed. // RefreshFeed refreshes a feed.
func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool) *locale.LocalizedErrorWrapper { func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool) (localizedError *locale.LocalizedErrorWrapper) {
slog.Debug("Begin feed refresh process", slog.Debug("Begin feed refresh process",
slog.Int64("user_id", userID), slog.Int64("user_id", userID),
slog.Int64("feed_id", feedID), slog.Int64("feed_id", feedID),
slog.Bool("force_refresh", forceRefresh), slog.Bool("force_refresh", forceRefresh),
) )
localizedError = nil
user, storeErr := store.UserByID(userID) user, storeErr := store.UserByID(userID)
if storeErr != nil { if storeErr != nil {
@ -221,6 +222,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
weeklyEntryCount := 0 weeklyEntryCount := 0
newTTL := 0 newTTL := 0
rateLimited := false
if config.Opts.PollingScheduler() == model.SchedulerEntryFrequency { if config.Opts.PollingScheduler() == model.SchedulerEntryFrequency {
var weeklyCountErr error var weeklyCountErr error
weeklyEntryCount, weeklyCountErr = store.WeeklyFeedEntryCount(userID, feedID) weeklyEntryCount, weeklyCountErr = store.WeeklyFeedEntryCount(userID, feedID)
@ -230,7 +232,31 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
} }
originalFeed.CheckedNow() originalFeed.CheckedNow()
originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL) // Commit the result to the database at the end of this function.
// If we met an error before entering the defer function, localizedError would not be nil.
defer func() {
originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL, rateLimited)
slog.Debug("Updated next check date",
slog.Int64("user_id", userID),
slog.Int64("feed_id", feedID),
slog.Int("weeklyEntryCount", weeklyEntryCount),
slog.Int("ttl", newTTL),
slog.Bool("rateLimited", rateLimited),
slog.Time("new_next_check_at", originalFeed.NextCheckAt),
)
if localizedError == nil {
// We have not encountered any error before entering this delay function.
originalFeed.ResetErrorCounter()
if storeErr := store.UpdateFeed(originalFeed); storeErr != nil {
// Update the return value when there is an error.
localizedError = locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr)
}
}
if localizedError != nil {
originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language))
store.UpdateFeedError(originalFeed)
}
}()
requestBuilder := fetcher.NewRequestBuilder() requestBuilder := fetcher.NewRequestBuilder()
requestBuilder.WithUsernameAndPassword(originalFeed.Username, originalFeed.Password) requestBuilder.WithUsernameAndPassword(originalFeed.Username, originalFeed.Password)
@ -251,17 +277,18 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
responseHandler := fetcher.NewResponseHandler(requestBuilder.ExecuteRequest(originalFeed.FeedURL)) responseHandler := fetcher.NewResponseHandler(requestBuilder.ExecuteRequest(originalFeed.FeedURL))
defer responseHandler.Close() defer responseHandler.Close()
if localizedError := responseHandler.LocalizedError(); localizedError != nil { rateLimited = responseHandler.IsRateLimited()
if rateLimited {
slog.Warn("Feed is rate limited (429 status code)", slog.String("feed_url", originalFeed.FeedURL))
}
if localizedError = responseHandler.LocalizedError(); localizedError != nil {
slog.Warn("Unable to fetch feed", slog.String("feed_url", originalFeed.FeedURL), slog.Any("error", localizedError.Error())) slog.Warn("Unable to fetch feed", slog.String("feed_url", originalFeed.FeedURL), slog.Any("error", localizedError.Error()))
originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language))
store.UpdateFeedError(originalFeed)
return localizedError return localizedError
} }
if store.AnotherFeedURLExists(userID, originalFeed.ID, responseHandler.EffectiveURL()) { if store.AnotherFeedURLExists(userID, originalFeed.ID, responseHandler.EffectiveURL()) {
localizedError := locale.NewLocalizedErrorWrapper(ErrDuplicatedFeed, "error.duplicated_feed") localizedError = locale.NewLocalizedErrorWrapper(ErrDuplicatedFeed, "error.duplicated_feed")
originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language))
store.UpdateFeedError(originalFeed)
return localizedError return localizedError
} }
@ -279,27 +306,16 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
updatedFeed, parseErr := parser.ParseFeed(responseHandler.EffectiveURL(), bytes.NewReader(responseBody)) updatedFeed, parseErr := parser.ParseFeed(responseHandler.EffectiveURL(), bytes.NewReader(responseBody))
if parseErr != nil { if parseErr != nil {
localizedError := locale.NewLocalizedErrorWrapper(parseErr, "error.unable_to_parse_feed", parseErr)
if errors.Is(parseErr, parser.ErrFeedFormatNotDetected) { if errors.Is(parseErr, parser.ErrFeedFormatNotDetected) {
localizedError = locale.NewLocalizedErrorWrapper(parseErr, "error.feed_format_not_detected", parseErr) localizedError = locale.NewLocalizedErrorWrapper(parseErr, "error.feed_format_not_detected", parseErr)
} else {
localizedError = locale.NewLocalizedErrorWrapper(parseErr, "error.unable_to_parse_feed", parseErr)
} }
originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language))
store.UpdateFeedError(originalFeed)
return localizedError return localizedError
} }
// If the feed has a TTL defined, we use it to make sure we don't check it too often. // If the feed has a TTL defined, we use it to make sure we don't check it too often.
newTTL = updatedFeed.TTL newTTL = updatedFeed.TTL
// Set the next check at with updated arguments.
originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL)
slog.Debug("Updated next check date",
slog.Int64("user_id", userID),
slog.Int64("feed_id", feedID),
slog.Int("ttl", newTTL),
slog.Time("new_next_check_at", originalFeed.NextCheckAt),
)
originalFeed.Entries = updatedFeed.Entries originalFeed.Entries = updatedFeed.Entries
processor.ProcessFeedEntries(store, originalFeed, user, forceRefresh) processor.ProcessFeedEntries(store, originalFeed, user, forceRefresh)
@ -308,9 +324,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
updateExistingEntries := forceRefresh || !originalFeed.Crawler updateExistingEntries := forceRefresh || !originalFeed.Crawler
newEntries, storeErr := store.RefreshFeedEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, updateExistingEntries) newEntries, storeErr := store.RefreshFeedEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, updateExistingEntries)
if storeErr != nil { if storeErr != nil {
localizedError := locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr) localizedError = locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr)
originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language))
store.UpdateFeedError(originalFeed)
return localizedError return localizedError
} }
@ -344,16 +358,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
) )
} }
originalFeed.ResetErrorCounter() return localizedError
if storeErr := store.UpdateFeed(originalFeed); storeErr != nil {
localizedError := locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr)
originalFeed.WithTranslatedErrorMessage(localizedError.Translate(user.Language))
store.UpdateFeedError(originalFeed)
return localizedError
}
return nil
} }
func checkFeedIcon(store *storage.Storage, requestBuilder *fetcher.RequestBuilder, feedID int64, websiteURL, feedIconURL string) { func checkFeedIcon(store *storage.Storage, requestBuilder *fetcher.RequestBuilder, feedID int64, websiteURL, feedIconURL string) {