From e6c6ee441aad1b276913e4888087c01e1572fbd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Guillot?= Date: Sun, 20 Sep 2020 23:01:01 -0700 Subject: [PATCH] Use a transaction to refresh and create entries Also includes few database improvements: - Speed up entries clean up with an index and a goroutine - Avoid the accumulation of enclosures for some feeds --- database/migration.go | 2 +- database/sql.go | 2 ++ database/sql/schema_version_36.sql | 1 + reader/feed/handler.go | 2 +- storage/enclosure.go | 27 ++++++-------- storage/entry.go | 56 ++++++++++++++++++++---------- storage/feed.go | 15 ++++++-- 7 files changed, 64 insertions(+), 41 deletions(-) create mode 100644 database/sql/schema_version_36.sql diff --git a/database/migration.go b/database/migration.go index 88d6519d..7d22b3d4 100644 --- a/database/migration.go +++ b/database/migration.go @@ -12,7 +12,7 @@ import ( "miniflux.app/logger" ) -const schemaVersion = 35 +const schemaVersion = 36 // Migrate executes database migrations. func Migrate(db *sql.DB) { diff --git a/database/sql.go b/database/sql.go index bc8a4c39..270557f8 100644 --- a/database/sql.go +++ b/database/sql.go @@ -189,6 +189,7 @@ create index entries_user_feed_idx on entries (user_id, feed_id); "schema_version_34": `CREATE INDEX entries_id_user_status_idx ON entries USING btree (id, user_id, status);`, "schema_version_35": `alter table feeds add column fetch_via_proxy bool default false; `, + "schema_version_36": `CREATE INDEX entries_feed_id_status_hash_idx ON entries USING btree (feed_id, status, hash);`, "schema_version_4": `create type entry_sorting_direction as enum('asc', 'desc'); alter table users add column entry_direction entry_sorting_direction default 'asc'; `, @@ -247,6 +248,7 @@ var SqlMapChecksums = map[string]string{ "schema_version_33": "bf38514efeb6c12511f41b1cc484f92722240b0a6ae874c32a958dfea3433d02", "schema_version_34": "1a3e036f652fc98b7564a27013f04e1eb36dd0d68893c723168f134dc1065822", "schema_version_35": "162a55df78eed4b9c9c141878132d5f1d97944b96f35a79e38f55716cdd6b3d2", + "schema_version_36": "8164be7818268ad3d4bdcad03a7868b58e32b27cde9b4f056cd82f7b182a0722", "schema_version_4": "216ea3a7d3e1704e40c797b5dc47456517c27dbb6ca98bf88812f4f63d74b5d9", "schema_version_5": "46397e2f5f2c82116786127e9f6a403e975b14d2ca7b652a48cd1ba843e6a27c", "schema_version_6": "9d05b4fb223f0e60efc716add5048b0ca9c37511cf2041721e20505d6d798ce4", diff --git a/database/sql/schema_version_36.sql b/database/sql/schema_version_36.sql new file mode 100644 index 00000000..660b5f4f --- /dev/null +++ b/database/sql/schema_version_36.sql @@ -0,0 +1 @@ +CREATE INDEX entries_feed_id_status_hash_idx ON entries USING btree (feed_id, status, hash); \ No newline at end of file diff --git a/reader/feed/handler.go b/reader/feed/handler.go index 5feac58d..6c039ff6 100644 --- a/reader/feed/handler.go +++ b/reader/feed/handler.go @@ -141,7 +141,7 @@ func (h *Handler) RefreshFeed(userID, feedID int64) error { processor.ProcessFeedEntries(h.store, originalFeed) // We don't update existing entries when the crawler is enabled (we crawl only inexisting entries). - if storeErr := h.store.UpdateEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, !originalFeed.Crawler); storeErr != nil { + if storeErr := h.store.RefreshFeedEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, !originalFeed.Crawler); storeErr != nil { originalFeed.WithError(storeErr.Error()) h.store.UpdateFeedError(originalFeed) return storeErr diff --git a/storage/enclosure.go b/storage/enclosure.go index 4596696c..aa9f64ef 100644 --- a/storage/enclosure.go +++ b/storage/enclosure.go @@ -5,6 +5,7 @@ package storage // import "miniflux.app/storage" import ( + "database/sql" "fmt" "miniflux.app/model" @@ -55,8 +56,7 @@ func (s *Storage) GetEnclosures(entryID int64) (model.EnclosureList, error) { return enclosures, nil } -// CreateEnclosure creates a new attachment. -func (s *Storage) CreateEnclosure(enclosure *model.Enclosure) error { +func (s *Storage) createEnclosure(tx *sql.Tx, enclosure *model.Enclosure) error { if enclosure.URL == "" { return nil } @@ -69,7 +69,7 @@ func (s *Storage) CreateEnclosure(enclosure *model.Enclosure) error { RETURNING id ` - err := s.db.QueryRow( + err := tx.QueryRow( query, enclosure.URL, enclosure.Size, @@ -85,22 +85,15 @@ func (s *Storage) CreateEnclosure(enclosure *model.Enclosure) error { return nil } -// IsEnclosureExists checks if an attachment exists. -func (s *Storage) IsEnclosureExists(enclosure *model.Enclosure) bool { - var result int - query := `SELECT 1 FROM enclosures WHERE user_id=$1 AND entry_id=$2 AND url=$3` - s.db.QueryRow(query, enclosure.UserID, enclosure.EntryID, enclosure.URL).Scan(&result) - return result >= 1 -} +func (s *Storage) updateEnclosures(tx *sql.Tx, userID, entryID int64, enclosures model.EnclosureList) error { + // We delete all attachments in the transaction to keep only the ones visible in the feeds. + if _, err := tx.Exec(`DELETE FROM enclosures WHERE user_id=$1 AND entry_id=$2`, userID, entryID); err != nil { + return err + } -// UpdateEnclosures add missing attachments while updating a feed. -func (s *Storage) UpdateEnclosures(enclosures model.EnclosureList) error { for _, enclosure := range enclosures { - if !s.IsEnclosureExists(enclosure) { - err := s.CreateEnclosure(enclosure) - if err != nil { - return err - } + if err := s.createEnclosure(tx, enclosure); err != nil { + return err } } diff --git a/storage/entry.go b/storage/entry.go index d956c38c..556f526c 100644 --- a/storage/entry.go +++ b/storage/entry.go @@ -5,6 +5,7 @@ package storage // import "miniflux.app/storage" import ( + "database/sql" "errors" "fmt" "time" @@ -74,7 +75,7 @@ func (s *Storage) UpdateEntryContent(entry *model.Entry) error { } // createEntry add a new entry. -func (s *Storage) createEntry(entry *model.Entry) error { +func (s *Storage) createEntry(tx *sql.Tx, entry *model.Entry) error { query := ` INSERT INTO entries (title, hash, url, comments_url, published_at, content, author, user_id, feed_id, changed_at, document_vectors) @@ -83,7 +84,7 @@ func (s *Storage) createEntry(entry *model.Entry) error { RETURNING id, status ` - err := s.db.QueryRow( + err := tx.QueryRow( query, entry.Title, entry.Hash, @@ -103,7 +104,7 @@ func (s *Storage) createEntry(entry *model.Entry) error { for i := 0; i < len(entry.Enclosures); i++ { entry.Enclosures[i].EntryID = entry.ID entry.Enclosures[i].UserID = entry.UserID - err := s.CreateEnclosure(entry.Enclosures[i]) + err := s.createEnclosure(tx, entry.Enclosures[i]) if err != nil { return err } @@ -115,7 +116,7 @@ func (s *Storage) createEntry(entry *model.Entry) error { // updateEntry updates an entry when a feed is refreshed. // Note: we do not update the published date because some feeds do not contains any date, // it default to time.Now() which could change the order of items on the history page. -func (s *Storage) updateEntry(entry *model.Entry) error { +func (s *Storage) updateEntry(tx *sql.Tx, entry *model.Entry) error { query := ` UPDATE entries @@ -131,7 +132,7 @@ func (s *Storage) updateEntry(entry *model.Entry) error { RETURNING id ` - err := s.db.QueryRow( + err := tx.QueryRow( query, entry.Title, entry.URL, @@ -152,15 +153,19 @@ func (s *Storage) updateEntry(entry *model.Entry) error { enclosure.EntryID = entry.ID } - return s.UpdateEnclosures(entry.Enclosures) + return s.updateEnclosures(tx, entry.UserID, entry.ID, entry.Enclosures) } // entryExists checks if an entry already exists based on its hash when refreshing a feed. -func (s *Storage) entryExists(entry *model.Entry) bool { - var result int - query := `SELECT 1 FROM entries WHERE user_id=$1 AND feed_id=$2 AND hash=$3` - s.db.QueryRow(query, entry.UserID, entry.FeedID, entry.Hash).Scan(&result) - return result == 1 +func (s *Storage) entryExists(tx *sql.Tx, entry *model.Entry) bool { + var result bool + tx.QueryRow( + `SELECT true FROM entries WHERE user_id=$1 AND feed_id=$2 AND hash=$3`, + entry.UserID, + entry.FeedID, + entry.Hash, + ).Scan(&result) + return result } // cleanupEntries deletes from the database entries marked as "removed" and not visible anymore in the feed. @@ -180,31 +185,44 @@ func (s *Storage) cleanupEntries(feedID int64, entryHashes []string) error { return nil } -// UpdateEntries updates a list of entries while refreshing a feed. -func (s *Storage) UpdateEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (err error) { +// RefreshFeedEntries updates feed entries while refreshing a feed. +func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (err error) { var entryHashes []string + for _, entry := range entries { entry.UserID = userID entry.FeedID = feedID - if s.entryExists(entry) { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf(`store: unable to start transaction: %v`, err) + } + + if s.entryExists(tx, entry) { if updateExistingEntries { - err = s.updateEntry(entry) + err = s.updateEntry(tx, entry) } } else { - err = s.createEntry(entry) + err = s.createEntry(tx, entry) } if err != nil { + tx.Rollback() return err } + if err := tx.Commit(); err != nil { + return fmt.Errorf(`store: unable to commit transaction: %v`, err) + } + entryHashes = append(entryHashes, entry.Hash) } - if err := s.cleanupEntries(feedID, entryHashes); err != nil { - logger.Error(`store: feed #%d: %v`, feedID, err) - } + go func() { + if err := s.cleanupEntries(feedID, entryHashes); err != nil { + logger.Error(`store: feed #%d: %v`, feedID, err) + } + }() return nil } diff --git a/storage/feed.go b/storage/feed.go index 49f7187f..508288ac 100644 --- a/storage/feed.go +++ b/storage/feed.go @@ -435,12 +435,21 @@ func (s *Storage) CreateFeed(feed *model.Feed) error { feed.Entries[i].FeedID = feed.ID feed.Entries[i].UserID = feed.UserID - if !s.entryExists(feed.Entries[i]) { - err := s.createEntry(feed.Entries[i]) - if err != nil { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf(`store: unable to start transaction: %v`, err) + } + + if !s.entryExists(tx, feed.Entries[i]) { + if err := s.createEntry(tx, feed.Entries[i]); err != nil { + tx.Rollback() return err } } + + if err := tx.Commit(); err != nil { + return fmt.Errorf(`store: unable to commit transaction: %v`, err) + } } return nil