Skip to content

Commit

Permalink
Add materialized views
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Oct 26, 2024
1 parent 8c64d7a commit b5b4b42
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 23 deletions.
6 changes: 5 additions & 1 deletion storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package data
import (
"context"
"encoding/json"
"reflect"
"net/url"
"reflect"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -147,6 +147,10 @@ type Feedback struct {
Comment string `gorm:"column:comment" mapsstructure:"comment"`
}

type UserFeedback Feedback

type ItemFeedback Feedback

// SortFeedbacks sorts feedback from latest to oldest.
func SortFeedbacks(feedback []Feedback) {
sort.Sort(feedbackSorter(feedback))
Expand Down
9 changes: 6 additions & 3 deletions storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,10 @@ func (suite *baseTestSuite) TestFeedback() {
// Get typed feedback by user
ret, err = suite.Database.GetUserFeedback(ctx, "2", lo.ToPtr(time.Now()), positiveFeedbackType)
suite.NoError(err)
suite.Equal(1, len(ret))
suite.Equal("2", ret[0].UserId)
suite.Equal("4", ret[0].ItemId)
if suite.Equal(1, len(ret)) {
suite.Equal("2", ret[0].UserId)
suite.Equal("4", ret[0].ItemId)
}
// Get all feedback by user
ret, err = suite.Database.GetUserFeedback(ctx, "2", lo.ToPtr(time.Now()))
suite.NoError(err)
Expand Down Expand Up @@ -580,6 +581,8 @@ func (suite *baseTestSuite) TestDeleteFeedback() {
// RowAffected isn't supported by ClickHouse,
suite.Equal(3, deleteCount)
}
err = suite.Database.Optimize()
suite.NoError(err)
ret, err = suite.Database.GetUserItemFeedback(ctx, "2", "3")
suite.NoError(err)
suite.Empty(ret)
Expand Down
104 changes: 86 additions & 18 deletions storage/data/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type SQLDatabase struct {
// Optimize is used by ClickHouse only.
func (d *SQLDatabase) Optimize() error {
if d.driver == ClickHouse {
for _, tableName := range []string{d.UsersTable(), d.ItemsTable(), d.FeedbackTable()} {
for _, tableName := range []string{d.UsersTable(), d.ItemsTable(), d.FeedbackTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()} {
_, err := d.client.Exec("OPTIMIZE TABLE " + tableName)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -247,8 +247,8 @@ func (d *SQLDatabase) Init() error {
}
type Feedback struct {
FeedbackType string `gorm:"column:feedback_type;type:String"`
UserId string `gorm:"column:user_id;type:String;index:user_index,type:bloom_filter(0.01),granularity:1"`
ItemId string `gorm:"column:item_id;type:String;index:item_index,type:bloom_filter(0.01),granularity:1"`
UserId string `gorm:"column:user_id;type:String"`
ItemId string `gorm:"column:item_id;type:String"`
Timestamp time.Time `gorm:"column:time_stamp;type:DateTime64(9,'UTC')"`
Comment string `gorm:"column:comment;type:String"`
Version struct{} `gorm:"column:version;type:DateTime"`
Expand All @@ -257,6 +257,27 @@ func (d *SQLDatabase) Init() error {
if err != nil {
return errors.Trace(err)
}
// create materialized views
type UserFeedback Feedback
err = d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY (user_id, item_id, feedback_type)").AutoMigrate(UserFeedback{})
if err != nil {
return errors.Trace(err)
}
err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW %s_mv TO %s AS SELECT * FROM %s",
d.UserFeedbackTable(), d.UserFeedbackTable(), d.FeedbackTable())).Error
if err != nil {
return errors.Trace(err)
}
type ItemFeedback Feedback
err = d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY (item_id, user_id, feedback_type)").AutoMigrate(ItemFeedback{})
if err != nil {
return errors.Trace(err)
}
err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW %s_mv TO %s AS SELECT * FROM %s",
d.ItemFeedbackTable(), d.ItemFeedbackTable(), d.FeedbackTable())).Error
if err != nil {
return errors.Trace(err)
}
}
return nil
}
Expand All @@ -271,15 +292,16 @@ func (d *SQLDatabase) Close() error {
}

func (d *SQLDatabase) Purge() error {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()}
if d.driver == ClickHouse {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()}
for _, tableName := range tables {
err := d.gormDB.Exec(fmt.Sprintf("alter table %s delete where 1=1", tableName)).Error
if err != nil {
return errors.Trace(err)
}
}
} else {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()}
for _, tableName := range tables {
err := d.gormDB.Exec(fmt.Sprintf("DELETE FROM %s", tableName)).Error
if err != nil {
Expand Down Expand Up @@ -357,6 +379,14 @@ func (d *SQLDatabase) DeleteItem(ctx context.Context, itemId string) error {
if err := d.gormDB.WithContext(ctx).Delete(&Feedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
if d.driver == ClickHouse {
if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand Down Expand Up @@ -492,7 +522,13 @@ func (d *SQLDatabase) GetItemStream(ctx context.Context, batchSize int, timeLimi

// GetItemFeedback returns feedback of a item from MySQL.
func (d *SQLDatabase) GetItemFeedback(ctx context.Context, itemId string, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).Select("user_id, item_id, feedback_type, time_stamp")
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.ItemFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("user_id, item_id, feedback_type, time_stamp")
switch d.driver {
case SQLite:
tx.Where("time_stamp <= DATETIME() AND item_id = ?", itemId)
Expand Down Expand Up @@ -561,6 +597,14 @@ func (d *SQLDatabase) DeleteUser(ctx context.Context, userId string) error {
if err := d.gormDB.WithContext(ctx).Delete(&Feedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
if d.driver == ClickHouse {
if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand Down Expand Up @@ -676,8 +720,13 @@ func (d *SQLDatabase) GetUserStream(ctx context.Context, batchSize int) (chan []

// GetUserFeedback returns feedback of a user from MySQL.
func (d *SQLDatabase) GetUserFeedback(ctx context.Context, userId string, endTime *time.Time, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).
Select("feedback_type, user_id, item_id, time_stamp, comment").
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.UserFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("feedback_type, user_id, item_id, time_stamp, comment").
Where("user_id = ?", userId)
if endTime != nil {
tx.Where("time_stamp <= ?", d.convertTimeZone(endTime))
Expand Down Expand Up @@ -957,8 +1006,13 @@ func (d *SQLDatabase) GetFeedbackStream(ctx context.Context, batchSize int, scan

// GetUserItemFeedback gets a feedback by user id and item id from MySQL.
func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, userId, itemId string, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).
Select("feedback_type, user_id, item_id, time_stamp, comment").
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.UserFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("feedback_type, user_id, item_id, time_stamp, comment").
Where("user_id = ? AND item_id = ?", userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
Expand All @@ -981,18 +1035,32 @@ func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, userId, itemId st

// DeleteUserItemFeedback deletes a feedback by user id and item id from MySQL.
func (d *SQLDatabase) DeleteUserItemFeedback(ctx context.Context, userId, itemId string, feedbackTypes ...string) (int, error) {
tx := d.gormDB.WithContext(ctx).Where("user_id = ? AND item_id = ?", userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
deleteUserItemFeedback := func(value any) (int, error) {
tx := d.gormDB.WithContext(ctx).Where("user_id = ? AND item_id = ?", userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
}
tx.Delete(value)
if tx.Error != nil {
return 0, errors.Trace(tx.Error)
}
return int(tx.RowsAffected), nil
}
tx.Delete(&Feedback{})
if tx.Error != nil {
return 0, errors.Trace(tx.Error)
rowAffected, err := deleteUserItemFeedback(&Feedback{})
if err != nil {
return 0, errors.Trace(err)
}
if tx.Error != nil && d.driver != ClickHouse {
return 0, errors.Trace(tx.Error)
if d.driver == ClickHouse {
_, err = deleteUserItemFeedback(&UserFeedback{})
if err != nil {
return 0, errors.Trace(err)
}
_, err = deleteUserItemFeedback(&ItemFeedback{})
if err != nil {
return 0, errors.Trace(err)
}
}
return int(tx.RowsAffected), nil
return rowAffected, nil
}

func (d *SQLDatabase) convertTimeZone(timestamp *time.Time) time.Time {
Expand Down
2 changes: 1 addition & 1 deletion storage/data/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func init() {
}
mySqlDSN = env("MYSQL_URI", "mysql://root:password@tcp(127.0.0.1:3306)/")
postgresDSN = env("POSTGRES_URI", "postgres://gorse:gorse_pass@127.0.0.1/")
clickhouseDSN = env("CLICKHOUSE_URI", "clickhouse://127.0.0.1:8123/")
clickhouseDSN = env("CLICKHOUSE_URI", "clickhouse://120.55.97.224:8123/")
}

type MySQLTestSuite struct {
Expand Down
10 changes: 10 additions & 0 deletions storage/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ func (tp TablePrefix) FeedbackTable() string {
return string(tp) + "feedback"
}

// UserFeedbackTable returns the materialized view of user feedback.
func (tp TablePrefix) UserFeedbackTable() string {
return string(tp) + "user_feedback"
}

// ItemFeedbackTable returns the materialized view of item feedback.
func (tp TablePrefix) ItemFeedbackTable() string {
return string(tp) + "item_feedback"
}

func (tp TablePrefix) Key(key string) string {
return string(tp) + key
}
Expand Down

0 comments on commit b5b4b42

Please sign in to comment.