Skip to content

Commit

Permalink
Merge pull request #78 from anyproto/GO-2837-space-limits
Browse files Browse the repository at this point in the history
GO-2837 space limits
  • Loading branch information
cheggaaa authored Mar 19, 2024
2 parents 5e1260a + 6f50cd9 commit 7dee8a2
Show file tree
Hide file tree
Showing 31 changed files with 1,769 additions and 658 deletions.
6 changes: 4 additions & 2 deletions cmd/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"syscall"
"time"

"github.com/anyproto/any-sync/acl"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/consensus/consensusclient"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
"github.com/anyproto/any-sync/coordinator/nodeconfsource"
"github.com/anyproto/any-sync/metric"
Expand All @@ -31,7 +33,6 @@ import (
"github.com/anyproto/any-sync-filenode/deletelog"
"github.com/anyproto/any-sync-filenode/filenode"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/redisprovider"

// import this to keep govvv in go.mod on mod tidy
Expand Down Expand Up @@ -112,7 +113,8 @@ func Bootstrap(a *app.App) {
Register(secureservice.New()).
Register(pool.New()).
Register(coordinatorclient.New()).
Register(limit.New()).
Register(consensusclient.New()).
Register(acl.New()).
Register(store()).
Register(redisprovider.New()).
Register(index.New()).
Expand Down
26 changes: 14 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,56 +43,58 @@ type Config struct {
NetworkStorePath string `yaml:"networkStorePath"`
NetworkUpdateIntervalSec int `yaml:"networkUpdateIntervalSec"`
CafeMigrateKey string `yaml:"cafeMigrateKey"`
DefaultLimit uint64 `yaml:"defaultLimit"`
PersistTtl uint `yaml:"persistTtl"`
}

func (c *Config) Init(a *app.App) (err error) {
return
}

func (c Config) Name() (name string) {
func (c *Config) Name() (name string) {
return CName
}

func (c Config) GetAccount() commonaccount.Config {
func (c *Config) GetAccount() commonaccount.Config {
return c.Account
}

func (c Config) GetS3Store() s3store.Config {
func (c *Config) GetS3Store() s3store.Config {
return c.S3Store
}

func (c Config) GetDevStore() FileDevStore {
func (c *Config) GetDevStore() FileDevStore {
return c.FileDevStore
}

func (c Config) GetDrpc() rpc.Config {
func (c *Config) GetDrpc() rpc.Config {
return c.Drpc
}

func (c Config) GetMetric() metric.Config {
func (c *Config) GetMetric() metric.Config {
return c.Metric
}

func (c Config) GetRedis() redisprovider.Config {
func (c *Config) GetRedis() redisprovider.Config {
return c.Redis
}

func (c Config) GetNodeConf() nodeconf.Configuration {
func (c *Config) GetNodeConf() nodeconf.Configuration {
return c.Network
}

func (c Config) GetNodeConfStorePath() string {
func (c *Config) GetNodeConfStorePath() string {
return c.NetworkStorePath
}

func (c Config) GetNodeConfUpdateInterval() int {
func (c *Config) GetNodeConfUpdateInterval() int {
return c.NetworkUpdateIntervalSec
}

func (c Config) GetYamux() yamux.Config {
func (c *Config) GetYamux() yamux.Config {
return c.Yamux
}

func (c Config) GetQuic() quic.Config {
func (c *Config) GetQuic() quic.Config {
return c.Quic
}
1 change: 1 addition & 0 deletions etc/any-sync-filenode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ network:
creationTime: 2023-04-13T20:29:23.453806629+02:00
networkStorePath: .
networkUpdateIntervalSec: 600
defaultLimit: 1073741824
133 changes: 96 additions & 37 deletions filenode/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package filenode
import (
"context"
"errors"
"slices"

"github.com/anyproto/any-sync/acl"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonfile/fileblockstore"
"github.com/anyproto/any-sync/commonfile/fileproto"
"github.com/anyproto/any-sync/commonfile/fileproto/fileprotoerr"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/rpc/server"
"github.com/anyproto/any-sync/nodeconf"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"go.uber.org/zap"

"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/store"
)

Expand All @@ -38,35 +41,43 @@ type Service interface {
}

type fileNode struct {
acl acl.AclService
index index.Index
store store.Store
limit limit.Limit
metric metric.Metric
nodeConf nodeconf.Service
migrateKey string
handler *rpcHandler
}

func (fn *fileNode) Init(a *app.App) (err error) {
fn.acl = a.MustComponent(acl.CName).(acl.AclService)
fn.store = a.MustComponent(fileblockstore.CName).(store.Store)
fn.index = a.MustComponent(index.CName).(index.Index)
fn.limit = a.MustComponent(limit.CName).(limit.Limit)
fn.handler = &rpcHandler{f: fn}
fn.metric = a.MustComponent(metric.CName).(metric.Metric)
fn.migrateKey = a.MustComponent(config.CName).(*config.Config).CafeMigrateKey
fn.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
return fileproto.DRPCRegisterFile(a.MustComponent(server.CName).(server.DRPCServer), fn.handler)
}

func (fn *fileNode) Name() (name string) {
return CName
}

func (fn *fileNode) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
exists, err := fn.index.CidExists(ctx, k)
if err != nil {
return nil, err
}
if !exists {
return nil, fileprotoerr.ErrCIDNotFound
func (fn *fileNode) Get(ctx context.Context, k cid.Cid, wait bool) (blocks.Block, error) {
if wait {
if err := fn.index.WaitCidExists(ctx, k); err != nil {
return nil, err
}
} else {
exists, err := fn.index.CidExists(ctx, k)
if err != nil {
return nil, err
}
if !exists {
return nil, fileprotoerr.ErrCIDNotFound
}
}
return fn.store.Get(ctx, k)
}
Expand Down Expand Up @@ -95,6 +106,7 @@ func (fn *fileNode) Add(ctx context.Context, spaceId string, fileId string, bs [
if err = fn.index.BlocksAdd(ctx, bs); err != nil {
return err
}
fn.index.OnBlockUploaded(ctx, bs...)
}
cidEntries, err := fn.index.CidEntriesByBlocks(ctx, bs)
if err != nil {
Expand Down Expand Up @@ -159,78 +171,89 @@ func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit boo
if spaceId == "" {
return storageKey, fileprotoerr.ErrForbidden
}
// this call also confirms that space exists and valid
limitBytes, groupId, err := fn.limit.Check(ctx, spaceId)

identity, err := peer.CtxPubKey(ctx)
if err != nil {
return
return storageKey, fileprotoerr.ErrForbidden
}

ownerPubKey, err := fn.acl.OwnerPubKey(ctx, spaceId)
if err != nil {
log.WarnCtx(ctx, "acl ownerPubKey error", zap.Error(err))
return storageKey, fileprotoerr.ErrForbidden
}
storageKey = index.Key{
GroupId: groupId,
GroupId: ownerPubKey.Account(),
SpaceId: spaceId,
}

// if it not owner
if identity.Account() != storageKey.GroupId {
permissions, err := fn.acl.Permissions(ctx, identity, spaceId)
if err != nil {
log.WarnCtx(ctx, "acl permissions error", zap.Error(err))
return storageKey, fileprotoerr.ErrForbidden
}
if !permissions.CanWrite() {
return storageKey, fileprotoerr.ErrForbidden
}
}

if e := fn.index.Migrate(ctx, storageKey); e != nil {
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
}

if checkLimit {
info, e := fn.index.GroupInfo(ctx, groupId)
if e != nil {
return storageKey, e
}
if info.BytesUsage >= limitBytes {
return storageKey, fileprotoerr.ErrSpaceLimitExceeded
if err = fn.index.CheckLimits(ctx, storageKey); err != nil {
if errors.Is(err, index.ErrLimitExceed) {
return storageKey, fileprotoerr.ErrSpaceLimitExceeded
} else {
log.WarnCtx(ctx, "check limit error", zap.Error(err))
return storageKey, fileprotoerr.ErrUnexpected
}
}
}
return
}

func (fn *fileNode) SpaceInfo(ctx context.Context, spaceId string) (info *fileproto.SpaceInfoResponse, err error) {
var (
storageKey = index.Key{SpaceId: spaceId}
limitBytes uint64
)
if limitBytes, storageKey.GroupId, err = fn.limit.Check(ctx, spaceId); err != nil {
storageKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return nil, err
}

if e := fn.index.Migrate(ctx, storageKey); e != nil {
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
}

groupInfo, err := fn.index.GroupInfo(ctx, storageKey.GroupId)
if err != nil {
return nil, err
}
if info, err = fn.spaceInfo(ctx, storageKey, groupInfo); err != nil {
return nil, err
}
info.LimitBytes = limitBytes
return
}

func (fn *fileNode) AccountInfo(ctx context.Context) (info *fileproto.AccountInfoResponse, err error) {
info = &fileproto.AccountInfoResponse{}
// we have space/identity validation in limit.Check
var groupId string

if info.LimitBytes, groupId, err = fn.limit.Check(ctx, ""); err != nil {
return nil, err
identity, err := peer.CtxPubKey(ctx)
if err != nil {
return nil, fileprotoerr.ErrForbidden
}

groupId := identity.Account()

groupInfo, err := fn.index.GroupInfo(ctx, groupId)
if err != nil {
return nil, err
}
info.TotalCidsCount = groupInfo.CidsCount
info.TotalUsageBytes = groupInfo.BytesUsage
info.LimitBytes = groupInfo.Limit
info.AccountLimitBytes = groupInfo.AccountLimit
for _, spaceId := range groupInfo.SpaceIds {
spaceInfo, err := fn.spaceInfo(ctx, index.Key{GroupId: groupId, SpaceId: spaceId}, groupInfo)
if err != nil {
return nil, err
}
spaceInfo.LimitBytes = info.LimitBytes
info.Spaces = append(info.Spaces, spaceInfo)
}
return
Expand All @@ -243,7 +266,13 @@ func (fn *fileNode) spaceInfo(ctx context.Context, key index.Key, groupInfo inde
if err != nil {
return nil, err
}
info.TotalUsageBytes = groupInfo.BytesUsage
if spaceInfo.Limit == 0 {
info.TotalUsageBytes = groupInfo.BytesUsage
info.LimitBytes = groupInfo.Limit
} else {
info.TotalUsageBytes = spaceInfo.BytesUsage
info.LimitBytes = spaceInfo.Limit
}
info.FilesCount = uint64(spaceInfo.FileCount)
info.CidsCount = spaceInfo.CidsCount
info.SpaceUsageBytes = spaceInfo.BytesUsage
Expand Down Expand Up @@ -293,3 +322,33 @@ func (fn *fileNode) MigrateCafe(ctx context.Context, bs []blocks.Block) error {
}
return nil
}

func (fn *fileNode) AccountLimitSet(ctx context.Context, identity string, limit uint64) (err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return
}
// check that call from the coordinator or the payment node
if !slices.Contains(fn.nodeConf.NodeTypes(peerId), nodeconf.NodeTypeCoordinator) &&
!slices.Contains(fn.nodeConf.NodeTypes(peerId), nodeconf.NodeTypePaymentProcessingNode) {
return fileprotoerr.ErrForbidden
}

return fn.index.SetGroupLimit(ctx, identity, limit)
}

func (fn *fileNode) SpaceLimitSet(ctx context.Context, spaceId string, limit uint64) (err error) {
storeKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return
}
return fn.index.SetSpaceLimit(ctx, storeKey, limit)
}

func (fn *fileNode) FilesGet(ctx context.Context, spaceId string) (fileIds []string, err error) {
storeKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return
}
return fn.index.FilesList(ctx, storeKey)
}
Loading

0 comments on commit 7dee8a2

Please sign in to comment.