Skip to content

Commit

Permalink
data: optimize ClickHouse queries via materialized views (#875)
Browse files Browse the repository at this point in the history
(cherry picked from commit b959e38)
  • Loading branch information
zhenghaoz committed Oct 30, 2024
1 parent 0c51548 commit 4c203ac
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
--health-retries 5
clickhouse:
image: clickhouse/clickhouse-server:21.10
image: clickhouse/clickhouse-server:22
ports:
- 8123
options: >-
Expand Down
2 changes: 1 addition & 1 deletion client/docker-compose.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ services:
{% elif database == 'clickhouse' %}

clickhouse:
image: clickhouse/clickhouse-server:21.10
image: clickhouse/clickhouse-server:22
ports:
- 8123:8123
environment:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
# - mongo_data:/data/db

# clickhouse:
# image: clickhouse/clickhouse-server:21.10
# image: clickhouse/clickhouse-server:22
# ports:
# - 8123:8123
# environment:
Expand Down
6 changes: 5 additions & 1 deletion storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package data
import (
"context"
"encoding/json"
"reflect"
"net/url"
"reflect"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -168,6 +168,10 @@ type Feedback struct {
Comment string `gorm:"column:comment" mapsstructure:"comment"`
}

type UserFeedback Feedback

type ItemFeedback Feedback

// SortFeedbacks sorts feedback from latest to oldest.
func SortFeedbacks(feedback []Feedback) {
sort.Sort(feedbackSorter(feedback))
Expand Down
23 changes: 10 additions & 13 deletions storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
positiveFeedbackType = "positiveFeedbackType"
negativeFeedbackType = "negativeFeedbackType"
duplicateFeedbackType = "duplicateFeedbackType"
dateTime64Zero = time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC)
)

type baseTestSuite struct {
Expand Down Expand Up @@ -221,10 +222,6 @@ func (suite *baseTestSuite) TestUsers() {
}

func (suite *baseTestSuite) TestFeedback() {
if suite.isClickHouse() {
// TODO: Fix in the next pull request
suite.T().Skip()
}
ctx := context.Background()
// users that already exists
err := suite.Database.BatchInsertUsers(ctx, []User{{"0", []string{"a"}, []string{"x"}, "comment"}})
Expand Down Expand Up @@ -283,8 +280,8 @@ func (suite *baseTestSuite) TestFeedback() {
suite.Equal(strconv.Itoa(i*2), item.ItemId)
if item.ItemId != "0" {
if suite.isClickHouse() {
// ClickHouse returns 1970-01-01 as zero date.
suite.Zero(item.Timestamp.Unix())
// ClickHouse returns 1900-01-01 00:00:00 +0000 UTC as zero date.
suite.Equal(dateTime64Zero, item.Timestamp)
} else {
suite.Zero(item.Timestamp)
}
Expand Down Expand Up @@ -314,9 +311,10 @@ func (suite *baseTestSuite) TestFeedback() {
// Get typed feedback by user
ret, err = suite.Database.GetUserFeedback(ctx, "namespace", "2", lo.ToPtr(time.Now()), positiveFeedbackType)
suite.NoError(err)
suite.Equal(1, len(ret))
suite.Equal("2", ret[0].UserId)
suite.Equal("4", ret[0].ItemId)
if suite.Equal(1, len(ret)) {
suite.Equal("2", ret[0].UserId)
suite.Equal("4", ret[0].ItemId)
}
// Get all feedback by user
ret, err = suite.Database.GetUserFeedback(ctx, "namespace", "2", lo.ToPtr(time.Now()))
suite.NoError(err)
Expand Down Expand Up @@ -471,6 +469,8 @@ func (suite *baseTestSuite) TestItems() {
err = suite.Database.Optimize()

Check failure on line 469 in storage/data/database_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
err = suite.Database.BatchInsertItems(ctx, []Item{{Namespace: "namespace", ItemId: "4", IsHidden: false, Categories: []string{"b"}, Labels: []string{"o"}, Comment: "override"}})
suite.NoError(err)
err = suite.Database.Optimize()
suite.NoError(err)
item, err := suite.Database.GetItem(ctx, "namespace", "4")
suite.NoError(err)
suite.False(item.IsHidden)
Expand Down Expand Up @@ -590,6 +590,7 @@ func (suite *baseTestSuite) TestDeleteFeedback() {
suite.Equal(3, deleteCount)
}
ret, err = suite.Database.GetUserItemFeedback(ctx, "namespace", "2", "3")

Check failure on line 592 in storage/data/database_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
err = suite.Database.Optimize()
suite.NoError(err)
suite.Empty(ret)
feedbackType1 := "type1"
Expand Down Expand Up @@ -670,10 +671,6 @@ func (suite *baseTestSuite) TestTimeLimit() {
}

func (suite *baseTestSuite) TestTimezone() {
if suite.isClickHouse() {
// TODO: Fix in the next pull request
suite.T().Skip()
}
ctx := context.Background()
loc, err := time.LoadLocation("Asia/Tokyo")
suite.NoError(err)
Expand Down
119 changes: 97 additions & 22 deletions storage/data/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type SQLDatabase struct {
// Optimize is used by ClickHouse only.
func (d *SQLDatabase) Optimize() error {
if d.driver == ClickHouse {
for _, tableName := range []string{d.UsersTable(), d.ItemsTable(), d.FeedbackTable()} {
for _, tableName := range []string{d.UsersTable(), d.ItemsTable(), d.FeedbackTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()} {
_, err := d.client.Exec("OPTIMIZE TABLE " + tableName)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -235,7 +235,7 @@ func (d *SQLDatabase) Init() error {
ItemId string `gorm:"column:item_id;type:String"`
IsHidden int `gorm:"column:is_hidden;type:Boolean;default:0"`
Categories string `gorm:"column:categories;type:String;default:'[]'"`
Timestamp time.Time `gorm:"column:time_stamp;type:Datetime"`
Timestamp time.Time `gorm:"column:time_stamp;type:Datetime64(9,'UTC')"`
Labels string `gorm:"column:labels;type:String;default:'[]'"`
Comment string `gorm:"column:comment;type:String"`
Version struct{} `gorm:"column:version;type:DateTime"`
Expand All @@ -258,13 +258,40 @@ func (d *SQLDatabase) Init() error {
type Feedback struct {
Namespace string `gorm:"column:namespace;type:String"`
FeedbackType string `gorm:"column:feedback_type;type:String"`
UserId string `gorm:"column:user_id;type:String;index:user_index,type:bloom_filter(0.01),granularity:1"`
ItemId string `gorm:"column:item_id;type:String;index:item_index,type:bloom_filter(0.01),granularity:1"`
Timestamp time.Time `gorm:"column:time_stamp;type:DateTime"`
UserId string `gorm:"column:user_id;type:String"`
ItemId string `gorm:"column:item_id;type:String"`
Timestamp time.Time `gorm:"column:time_stamp;type:DateTime64(9,'UTC')"`
Comment string `gorm:"column:comment;type:String"`
Version struct{} `gorm:"column:version;type:DateTime"`
}
err = d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY (namespace, feedback_type, user_id, item_id)").AutoMigrate(Feedback{})
err = d.gormDB.Set("gorm:table_options",
"ENGINE = ReplacingMergeTree(version) ORDER BY (namespace, feedback_type, user_id, item_id)").
AutoMigrate(Feedback{})
if err != nil {
return errors.Trace(err)
}
// create materialized views
type UserFeedback Feedback
err = d.gormDB.Set("gorm:table_options",
"ENGINE = ReplacingMergeTree(version) ORDER BY (user_id, namespace, item_id, feedback_type)").
AutoMigrate(UserFeedback{})
if err != nil {
return errors.Trace(err)
}
err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW IF NOT EXISTS %s_mv TO %s AS SELECT * FROM %s",
d.UserFeedbackTable(), d.UserFeedbackTable(), d.FeedbackTable())).Error
if err != nil {
return errors.Trace(err)
}
type ItemFeedback Feedback
err = d.gormDB.Set("gorm:table_options",
"ENGINE = ReplacingMergeTree(version) ORDER BY (namespace, item_id, user_id, feedback_type)").
AutoMigrate(ItemFeedback{})
if err != nil {
return errors.Trace(err)
}
err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW IF NOT EXISTS %s_mv TO %s AS SELECT * FROM %s",
d.ItemFeedbackTable(), d.ItemFeedbackTable(), d.FeedbackTable())).Error
if err != nil {
return errors.Trace(err)
}
Expand All @@ -282,15 +309,16 @@ func (d *SQLDatabase) Close() error {
}

func (d *SQLDatabase) Purge() error {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()}
if d.driver == ClickHouse {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()}
for _, tableName := range tables {
err := d.gormDB.Exec(fmt.Sprintf("alter table %s delete where 1=1", tableName)).Error
if err != nil {
return errors.Trace(err)
}
}
} else {
tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()}
for _, tableName := range tables {
err := d.gormDB.Exec(fmt.Sprintf("DELETE FROM %s", tableName)).Error
if err != nil {
Expand Down Expand Up @@ -370,6 +398,14 @@ func (d *SQLDatabase) DeleteItem(ctx context.Context, namespace string, itemId s
Delete(&Feedback{}, "namespace = ? and item_id = ?", namespace, itemId).Error; err != nil {
return errors.Trace(err)
}
if d.driver == ClickHouse {
if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "item_id = ?", itemId).Error; err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand Down Expand Up @@ -523,11 +559,18 @@ func (d *SQLDatabase) GetItemStream(ctx context.Context, batchSize int, timeLimi

// GetItemFeedback returns feedback of a item from MySQL.
func (d *SQLDatabase) GetItemFeedback(ctx context.Context, namespace string, itemId string, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).
Select("namespace, user_id, item_id, feedback_type, time_stamp")
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.ItemFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("namespace, user_id, item_id, feedback_type, time_stamp")
switch d.driver {
case SQLite:
tx.Where("time_stamp <= DATETIME() AND namespace = ? AND item_id = ?", namespace, itemId)
case ClickHouse:
tx.Where("time_stamp <= NOW('UTC') AND namespace = ? AND item_id = ?", namespace, itemId)
default:
tx.Where("time_stamp <= NOW() AND namespace = ? AND item_id = ?", namespace, itemId)
}
Expand Down Expand Up @@ -591,6 +634,14 @@ func (d *SQLDatabase) DeleteUser(ctx context.Context, userId string) error {
if err := d.gormDB.WithContext(ctx).Delete(&Feedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
if d.driver == ClickHouse {
if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "user_id = ?", userId).Error; err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand Down Expand Up @@ -706,8 +757,13 @@ func (d *SQLDatabase) GetUserStream(ctx context.Context, batchSize int) (chan []

// GetUserFeedback returns feedback of a user from MySQL.
func (d *SQLDatabase) GetUserFeedback(ctx context.Context, namespace string, userId string, endTime *time.Time, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).
Select("namespace, feedback_type, user_id, item_id, time_stamp, comment").
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.UserFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("namespace, feedback_type, user_id, item_id, time_stamp, comment").
Where("namespace = ? and user_id = ?", namespace, userId)
if endTime != nil {
tx.Where("time_stamp <= ?", d.convertTimeZone(endTime))
Expand Down Expand Up @@ -1002,8 +1058,13 @@ func (d *SQLDatabase) GetFeedbackStream(ctx context.Context, batchSize int, scan

// GetUserItemFeedback gets a feedback by user id and item id from MySQL.
func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, namespace, userId, itemId string, feedbackTypes ...string) ([]Feedback, error) {
tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()).
Select("namespace, feedback_type, user_id, item_id, time_stamp, comment").
tx := d.gormDB.WithContext(ctx)
if d.driver == ClickHouse {
tx = tx.Table(d.UserFeedbackTable())
} else {
tx = tx.Table(d.FeedbackTable())
}
tx = tx.Select("namespace, feedback_type, user_id, item_id, time_stamp, comment").
Where("namespace = ? AND user_id = ? AND item_id = ?", namespace, userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
Expand All @@ -1026,18 +1087,32 @@ func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, namespace, userId

// DeleteUserItemFeedback deletes a feedback by user id and item id from MySQL.
func (d *SQLDatabase) DeleteUserItemFeedback(ctx context.Context, namespace, userId, itemId string, feedbackTypes ...string) (int, error) {
tx := d.gormDB.WithContext(ctx).Where("namespace = ? AND user_id = ? AND item_id = ?", namespace, userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
deleteUserItemFeedback := func(value any) (int, error) {
tx := d.gormDB.WithContext(ctx).Where("namespace = ? AND user_id = ? AND item_id = ?", namespace, userId, itemId)
if len(feedbackTypes) > 0 {
tx.Where("feedback_type IN ?", feedbackTypes)
}
tx.Delete(value)
if tx.Error != nil {
return 0, errors.Trace(tx.Error)
}
return int(tx.RowsAffected), nil
}
tx.Delete(&Feedback{})
if tx.Error != nil {
return 0, errors.Trace(tx.Error)
rowAffected, err := deleteUserItemFeedback(&Feedback{})
if err != nil {
return 0, errors.Trace(err)
}
if tx.Error != nil && d.driver != ClickHouse {
return 0, errors.Trace(tx.Error)
if d.driver == ClickHouse {
_, err = deleteUserItemFeedback(&UserFeedback{})
if err != nil {
return 0, errors.Trace(err)
}
_, err = deleteUserItemFeedback(&ItemFeedback{})
if err != nil {
return 0, errors.Trace(err)
}
}
return int(tx.RowsAffected), nil
return rowAffected, nil
}

func (d *SQLDatabase) convertTimeZone(timestamp *time.Time) time.Time {
Expand Down
10 changes: 10 additions & 0 deletions storage/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ func (tp TablePrefix) FeedbackTable() string {
return string(tp) + "feedback"
}

// UserFeedbackTable returns the materialized view of user feedback.
func (tp TablePrefix) UserFeedbackTable() string {
return string(tp) + "user_feedback"
}

// ItemFeedbackTable returns the materialized view of item feedback.
func (tp TablePrefix) ItemFeedbackTable() string {
return string(tp) + "item_feedback"
}

func (tp TablePrefix) Key(key string) string {
return string(tp) + key
}
Expand Down

0 comments on commit 4c203ac

Please sign in to comment.