Skip to content

Commit

Permalink
MM-61225: Revert session pooling (mattermost#28901)
Browse files Browse the repository at this point in the history
This originated from mattermost#15249.

However, the original idea was discarded mattermost#15249 (comment)
as being too complicated to implement. Then I had another
idea to implement it just for session objects.

My thinking was that since every single request allocates a new
session struct, it would be good to use a sync.Pool for that.

However, 4 years later, now we know that the primary bottleneck
in app performance comes from websocket event marshalling.
Therefore, while it would be good to do this, it is difficult
to do it correctly (as shown by the numerous racy tests).

Hence, reverting this.

```release-note
NONE
```
  • Loading branch information
agnivade authored Oct 24, 2024
1 parent a8cf496 commit 6075b1c
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 86 deletions.
3 changes: 1 addition & 2 deletions server/channels/app/app_iface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions server/channels/app/ldap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@ import (
// SyncLdap starts an LDAP sync job.
// If includeRemovedMembers is true, then members who left or were removed from a team/channel will
// be re-added; otherwise, they will not be re-added.
func (a *App) SyncLdap(rctx request.CTX, includeRemovedMembers bool) {
rctx = rctx.Clone()
func (a *App) SyncLdap(c request.CTX, includeRemovedMembers bool) {
a.Srv().Go(func() {
if license := a.Srv().License(); license != nil && *license.Features.LDAP {
if !*a.Config().LdapSettings.EnableSync {
rctx.Logger().Error("LdapSettings.EnableSync is set to false. Skipping LDAP sync.")
c.Logger().Error("LdapSettings.EnableSync is set to false. Skipping LDAP sync.")
return
}

ldapI := a.Ldap()
if ldapI == nil {
rctx.Logger().Error("Not executing ldap sync because ldap is not available")
c.Logger().Error("Not executing ldap sync because ldap is not available")
return
}
ldapI.StartSynchronizeJob(rctx, false, includeRemovedMembers)
ldapI.StartSynchronizeJob(c, false, includeRemovedMembers)
}
})
}
Expand Down
19 changes: 2 additions & 17 deletions server/channels/app/opentracing/opentracing_layer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions server/channels/app/platform/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type PlatformService struct {
cacheProvider cache.Provider
statusCache cache.Cache
sessionCache cache.Cache
sessionPool sync.Pool

asymmetricSigningKey atomic.Pointer[ecdsa.PrivateKey]
clientConfig atomic.Value
Expand Down Expand Up @@ -124,11 +123,6 @@ func New(sc ServiceConfig, options ...Option) (*PlatformService, error) {
WebSocketRouter: &WebSocketRouter{
handlers: make(map[string]webSocketHandler),
},
sessionPool: sync.Pool{
New: func() any {
return &model.Session{}
},
},
licenseListeners: map[string]func(*model.License, *model.License){},
additionalClusterHandlers: map[model.ClusterEvent]einterfaces.ClusterMessageHandler{},
}
Expand Down
21 changes: 3 additions & 18 deletions server/channels/app/platform/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ import (
"github.com/mattermost/mattermost/server/v8/platform/services/cache"
)

func (ps *PlatformService) ReturnSessionToPool(session *model.Session) {
if session != nil {
session.Id = ""
// All existing prop fields are cleared once the session is retrieved from the pool.
// To speed up that process, clear the props here to avoid doing that in the hot path.
//
// If the request handler spawns a goroutine that uses the session, it might race with this code.
// In that case, the handler should copy the session and use the copy in the goroutine.
clear(session.Props)
ps.sessionPool.Put(session)
}
}

func (ps *PlatformService) CreateSession(c request.CTX, session *model.Session) (*model.Session, error) {
session.Token = ""

Expand Down Expand Up @@ -133,8 +120,8 @@ func (ps *PlatformService) ClearAllUsersSessionCache() {
}

func (ps *PlatformService) GetSession(c request.CTX, token string) (*model.Session, error) {
var session = ps.sessionPool.Get().(*model.Session)
if err := ps.sessionCache.Get(token, session); err == nil {
var session model.Session
if err := ps.sessionCache.Get(token, &session); err == nil {
if m := ps.metricsIFace; m != nil {
m.IncrementMemCacheHitCounterSession()
}
Expand All @@ -145,7 +132,7 @@ func (ps *PlatformService) GetSession(c request.CTX, token string) (*model.Sessi
}

if session.Id != "" {
return session, nil
return &session, nil
}

return ps.GetSessionContext(c, token)
Expand Down Expand Up @@ -206,8 +193,6 @@ func (ps *PlatformService) RevokeSession(c request.CTX, session *model.Session)
func (ps *PlatformService) RevokeAccessToken(c request.CTX, token string) error {
session, _ := ps.GetSession(c, token)

defer ps.ReturnSessionToPool(session)

schan := make(chan error, 1)
go func() {
schan <- ps.Store.Session().Remove(token)
Expand Down
27 changes: 10 additions & 17 deletions server/channels/app/platform/web_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ func (ps *PlatformService) PopulateWebConnConfig(s *model.Session, cfg *WebConnC

// NewWebConn returns a new WebConn instance.
func (ps *PlatformService) NewWebConn(cfg *WebConnConfig, suite SuiteIFace, runner HookRunner) *WebConn {
userID := cfg.Session.UserId
session := cfg.Session
if cfg.Session.UserId != "" {
ps.Go(func() {
ps.SetStatusOnline(userID, false)
ps.UpdateLastActivityAtIfNeeded(session)
})
}

// Disable TCP_NO_DELAY for higher throughput
var tcpConn *net.TCPConn
switch conn := cfg.WebSocket.UnderlyingConn().(type) {
Expand Down Expand Up @@ -245,23 +254,9 @@ func (ps *PlatformService) NewWebConn(cfg *WebConnConfig, suite SuiteIFace, runn
remoteAddress: cfg.RemoteAddress,
xForwardedFor: cfg.XForwardedFor,
}

wc.SetSession(&cfg.Session)
userID := cfg.Session.UserId
if userID != "" {
// UpdateLastActivityAtIfNeeded might block if the Hub is busy.
// Create a goroutine to avoid blocking the creation of the websocket connection.
ps.Go(func() {
ps.SetStatusOnline(userID, false)
session := wc.GetSession()
if session != nil {
ps.UpdateLastActivityAtIfNeeded(*session)
}
})
}

wc.Active.Store(cfg.Active)

wc.SetSession(&cfg.Session)
wc.SetSessionToken(cfg.Session.Token)
wc.SetSessionExpiresAt(cfg.Session.ExpiresAt)
wc.SetConnectionID(cfg.ConnectionID)
Expand Down Expand Up @@ -393,8 +388,6 @@ func (wc *WebConn) GetSession() *model.Session {

// SetSession sets the session of the connection.
func (wc *WebConn) SetSession(v *model.Session) {
// Clone the session first as WebConn takes ownership of the object
// and the web.Hub will return it to the [sync.Pool] once the WebConn gets removed.
if v != nil {
v = v.DeepCopy()
}
Expand Down
2 changes: 0 additions & 2 deletions server/channels/app/platform/web_hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,6 @@ func (i *hubConnectionIndex) Add(wc *WebConn) {
}

func (i *hubConnectionIndex) Remove(wc *WebConn) {
wc.Platform.ReturnSessionToPool(wc.GetSession())

userConnIndex, ok := i.byConnection[wc]
if !ok {
return
Expand Down
1 change: 0 additions & 1 deletion server/channels/app/plugin_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (ch *Channels) servePluginRequest(w http.ResponseWriter, r *http.Request, h
r.Header.Del("Mattermost-User-Id")
if token != "" {
session, err := New(ServerConnector(ch)).GetSession(token)
defer ch.srv.platform.ReturnSessionToPool(session)

csrfCheckPassed := false

Expand Down
4 changes: 0 additions & 4 deletions server/channels/app/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,6 @@ func (a *App) RevokeSessionsFromAllUsers() *model.AppError {
return nil
}

func (a *App) ReturnSessionToPool(session *model.Session) {
a.ch.srv.platform.ReturnSessionToPool(session)
}

func (a *App) ClearSessionCacheForUser(userID string) {
a.ch.srv.platform.ClearUserSessionCache(userID)
}
Expand Down
1 change: 0 additions & 1 deletion server/channels/web/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

if token != "" && tokenLocation != app.TokenLocationCloudHeader && tokenLocation != app.TokenLocationRemoteClusterHeader {
session, err := c.App.GetSession(token)
defer c.App.ReturnSessionToPool(session)

if err != nil {
c.Logger.Info("Invalid session", mlog.Err(err))
Expand Down
1 change: 0 additions & 1 deletion server/channels/wsapi/websocket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func (wh webSocketHandler) ServeWebSocket(conn *platform.WebConn, r *model.WebSo
return
}
session, sessionErr := wh.app.GetSession(conn.GetSessionToken())
defer wh.app.ReturnSessionToPool(session)

if sessionErr != nil {
mlog.Error(
Expand Down
13 changes: 1 addition & 12 deletions server/public/shared/request/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,9 @@ func TestContext(t testing.TB) *Context {
return EmptyContext(logger)
}

// Clone creates a deep copy of [CTX].
// It should only be used to pass a [CTX] to a separate goroutine that
// has a longer lifespan than the main goroutine handling the request.
// It should be used sparsely as coping [CTX] is often unnecessary.
func (c *Context) Clone() CTX {
return c.clone()
}

// clone creates a deep copy of [Context], allowing clones to apply per-request changes.
// It unexported to prevent leaking the [Context] type from the [CTX] interface.
// clone creates a shallow copy of Context, allowing clones to apply per-request changes.
func (c *Context) clone() *Context {
cCopy := *c
cCopy.session = *c.session.DeepCopy()
return &cCopy
}

Expand Down Expand Up @@ -183,5 +173,4 @@ type CTX interface {
WithLogger(mlog.LoggerIFace) CTX
WithContext(ctx context.Context) CTX
With(func(ctx CTX) CTX) CTX
Clone() CTX
}

0 comments on commit 6075b1c

Please sign in to comment.