Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-2837 space limits #78

Merged
merged 15 commits into from
Mar 19, 2024
6 changes: 4 additions & 2 deletions cmd/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"syscall"
"time"

"github.com/anyproto/any-sync/acl"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/consensus/consensusclient"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
"github.com/anyproto/any-sync/coordinator/nodeconfsource"
"github.com/anyproto/any-sync/metric"
Expand All @@ -31,7 +33,6 @@ import (
"github.com/anyproto/any-sync-filenode/deletelog"
"github.com/anyproto/any-sync-filenode/filenode"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/redisprovider"

// import this to keep govvv in go.mod on mod tidy
Expand Down Expand Up @@ -112,7 +113,8 @@ func Bootstrap(a *app.App) {
Register(secureservice.New()).
Register(pool.New()).
Register(coordinatorclient.New()).
Register(limit.New()).
Register(consensusclient.New()).
Register(acl.New()).
Register(store()).
Register(redisprovider.New()).
Register(index.New()).
Expand Down
26 changes: 14 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,56 +43,58 @@ type Config struct {
NetworkStorePath string `yaml:"networkStorePath"`
NetworkUpdateIntervalSec int `yaml:"networkUpdateIntervalSec"`
CafeMigrateKey string `yaml:"cafeMigrateKey"`
DefaultLimit uint64 `yaml:"defaultLimit"`
PersistTtl uint `yaml:"persistTtl"`
}

func (c *Config) Init(a *app.App) (err error) {
return
}

func (c Config) Name() (name string) {
func (c *Config) Name() (name string) {
return CName
}

func (c Config) GetAccount() commonaccount.Config {
func (c *Config) GetAccount() commonaccount.Config {
return c.Account
}

func (c Config) GetS3Store() s3store.Config {
func (c *Config) GetS3Store() s3store.Config {
return c.S3Store
}

func (c Config) GetDevStore() FileDevStore {
func (c *Config) GetDevStore() FileDevStore {
return c.FileDevStore
}

func (c Config) GetDrpc() rpc.Config {
func (c *Config) GetDrpc() rpc.Config {
return c.Drpc
}

func (c Config) GetMetric() metric.Config {
func (c *Config) GetMetric() metric.Config {
return c.Metric
}

func (c Config) GetRedis() redisprovider.Config {
func (c *Config) GetRedis() redisprovider.Config {
return c.Redis
}

func (c Config) GetNodeConf() nodeconf.Configuration {
func (c *Config) GetNodeConf() nodeconf.Configuration {
return c.Network
}

func (c Config) GetNodeConfStorePath() string {
func (c *Config) GetNodeConfStorePath() string {
return c.NetworkStorePath
}

func (c Config) GetNodeConfUpdateInterval() int {
func (c *Config) GetNodeConfUpdateInterval() int {
return c.NetworkUpdateIntervalSec
}

func (c Config) GetYamux() yamux.Config {
func (c *Config) GetYamux() yamux.Config {
return c.Yamux
}

func (c Config) GetQuic() quic.Config {
func (c *Config) GetQuic() quic.Config {
return c.Quic
}
1 change: 1 addition & 0 deletions etc/any-sync-filenode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ network:
creationTime: 2023-04-13T20:29:23.453806629+02:00
networkStorePath: .
networkUpdateIntervalSec: 600
defaultLimit: 1073741824
133 changes: 96 additions & 37 deletions filenode/filenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package filenode
import (
"context"
"errors"
"slices"

"github.com/anyproto/any-sync/acl"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonfile/fileblockstore"
"github.com/anyproto/any-sync/commonfile/fileproto"
"github.com/anyproto/any-sync/commonfile/fileproto/fileprotoerr"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/rpc/server"
"github.com/anyproto/any-sync/nodeconf"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"go.uber.org/zap"

"github.com/anyproto/any-sync-filenode/config"
"github.com/anyproto/any-sync-filenode/index"
"github.com/anyproto/any-sync-filenode/limit"
"github.com/anyproto/any-sync-filenode/store"
)

Expand All @@ -38,35 +41,43 @@ type Service interface {
}

type fileNode struct {
acl acl.AclService
index index.Index
store store.Store
limit limit.Limit
metric metric.Metric
nodeConf nodeconf.Service
migrateKey string
handler *rpcHandler
}

func (fn *fileNode) Init(a *app.App) (err error) {
fn.acl = a.MustComponent(acl.CName).(acl.AclService)
fn.store = a.MustComponent(fileblockstore.CName).(store.Store)
fn.index = a.MustComponent(index.CName).(index.Index)
fn.limit = a.MustComponent(limit.CName).(limit.Limit)
fn.handler = &rpcHandler{f: fn}
fn.metric = a.MustComponent(metric.CName).(metric.Metric)
fn.migrateKey = a.MustComponent(config.CName).(*config.Config).CafeMigrateKey
fn.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
return fileproto.DRPCRegisterFile(a.MustComponent(server.CName).(server.DRPCServer), fn.handler)
}

func (fn *fileNode) Name() (name string) {
return CName
}

func (fn *fileNode) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
exists, err := fn.index.CidExists(ctx, k)
if err != nil {
return nil, err
}
if !exists {
return nil, fileprotoerr.ErrCIDNotFound
func (fn *fileNode) Get(ctx context.Context, k cid.Cid, wait bool) (blocks.Block, error) {
if wait {
if err := fn.index.WaitCidExists(ctx, k); err != nil {
return nil, err
}
} else {
exists, err := fn.index.CidExists(ctx, k)
if err != nil {
return nil, err
}
if !exists {
return nil, fileprotoerr.ErrCIDNotFound
}
}
return fn.store.Get(ctx, k)
}
Expand Down Expand Up @@ -95,6 +106,7 @@ func (fn *fileNode) Add(ctx context.Context, spaceId string, fileId string, bs [
if err = fn.index.BlocksAdd(ctx, bs); err != nil {
return err
}
fn.index.OnBlockUploaded(ctx, bs...)
}
cidEntries, err := fn.index.CidEntriesByBlocks(ctx, bs)
if err != nil {
Expand Down Expand Up @@ -159,78 +171,89 @@ func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit boo
if spaceId == "" {
return storageKey, fileprotoerr.ErrForbidden
}
// this call also confirms that space exists and valid
limitBytes, groupId, err := fn.limit.Check(ctx, spaceId)

identity, err := peer.CtxPubKey(ctx)
if err != nil {
return
return storageKey, fileprotoerr.ErrForbidden
}

ownerPubKey, err := fn.acl.OwnerPubKey(ctx, spaceId)
if err != nil {
log.WarnCtx(ctx, "acl ownerPubKey error", zap.Error(err))
return storageKey, fileprotoerr.ErrForbidden
}
storageKey = index.Key{
GroupId: groupId,
GroupId: ownerPubKey.Account(),
SpaceId: spaceId,
}

// if it not owner
if identity.Account() != storageKey.GroupId {
permissions, err := fn.acl.Permissions(ctx, identity, spaceId)
if err != nil {
log.WarnCtx(ctx, "acl permissions error", zap.Error(err))
return storageKey, fileprotoerr.ErrForbidden
}
if !permissions.CanWrite() {
return storageKey, fileprotoerr.ErrForbidden
}
}

if e := fn.index.Migrate(ctx, storageKey); e != nil {
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
}

if checkLimit {
info, e := fn.index.GroupInfo(ctx, groupId)
if e != nil {
return storageKey, e
}
if info.BytesUsage >= limitBytes {
return storageKey, fileprotoerr.ErrSpaceLimitExceeded
if err = fn.index.CheckLimits(ctx, storageKey); err != nil {
if errors.Is(err, index.ErrLimitExceed) {
return storageKey, fileprotoerr.ErrSpaceLimitExceeded
} else {
log.WarnCtx(ctx, "check limit error", zap.Error(err))
return storageKey, fileprotoerr.ErrUnexpected
}
}
}
return
}

func (fn *fileNode) SpaceInfo(ctx context.Context, spaceId string) (info *fileproto.SpaceInfoResponse, err error) {
var (
storageKey = index.Key{SpaceId: spaceId}
limitBytes uint64
)
if limitBytes, storageKey.GroupId, err = fn.limit.Check(ctx, spaceId); err != nil {
storageKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return nil, err
}

if e := fn.index.Migrate(ctx, storageKey); e != nil {
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
}

groupInfo, err := fn.index.GroupInfo(ctx, storageKey.GroupId)
if err != nil {
return nil, err
}
if info, err = fn.spaceInfo(ctx, storageKey, groupInfo); err != nil {
return nil, err
}
info.LimitBytes = limitBytes
return
}

func (fn *fileNode) AccountInfo(ctx context.Context) (info *fileproto.AccountInfoResponse, err error) {
info = &fileproto.AccountInfoResponse{}
// we have space/identity validation in limit.Check
var groupId string

if info.LimitBytes, groupId, err = fn.limit.Check(ctx, ""); err != nil {
return nil, err
identity, err := peer.CtxPubKey(ctx)
if err != nil {
return nil, fileprotoerr.ErrForbidden
}

groupId := identity.Account()

groupInfo, err := fn.index.GroupInfo(ctx, groupId)
if err != nil {
return nil, err
}
info.TotalCidsCount = groupInfo.CidsCount
info.TotalUsageBytes = groupInfo.BytesUsage
info.LimitBytes = groupInfo.Limit
info.AccountLimitBytes = groupInfo.AccountLimit
for _, spaceId := range groupInfo.SpaceIds {
spaceInfo, err := fn.spaceInfo(ctx, index.Key{GroupId: groupId, SpaceId: spaceId}, groupInfo)
if err != nil {
return nil, err
}
spaceInfo.LimitBytes = info.LimitBytes
info.Spaces = append(info.Spaces, spaceInfo)
}
return
Expand All @@ -243,7 +266,13 @@ func (fn *fileNode) spaceInfo(ctx context.Context, key index.Key, groupInfo inde
if err != nil {
return nil, err
}
info.TotalUsageBytes = groupInfo.BytesUsage
if spaceInfo.Limit == 0 {
info.TotalUsageBytes = groupInfo.BytesUsage
info.LimitBytes = groupInfo.Limit
} else {
info.TotalUsageBytes = spaceInfo.BytesUsage
info.LimitBytes = spaceInfo.Limit
}
info.FilesCount = uint64(spaceInfo.FileCount)
info.CidsCount = spaceInfo.CidsCount
info.SpaceUsageBytes = spaceInfo.BytesUsage
Expand Down Expand Up @@ -293,3 +322,33 @@ func (fn *fileNode) MigrateCafe(ctx context.Context, bs []blocks.Block) error {
}
return nil
}

func (fn *fileNode) AccountLimitSet(ctx context.Context, identity string, limit uint64) (err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return
}
// check that call from the coordinator or the payment node
if !slices.Contains(fn.nodeConf.NodeTypes(peerId), nodeconf.NodeTypeCoordinator) &&
!slices.Contains(fn.nodeConf.NodeTypes(peerId), nodeconf.NodeTypePaymentProcessingNode) {
return fileprotoerr.ErrForbidden
}

return fn.index.SetGroupLimit(ctx, identity, limit)
}

func (fn *fileNode) SpaceLimitSet(ctx context.Context, spaceId string, limit uint64) (err error) {
storeKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return
}
return fn.index.SetSpaceLimit(ctx, storeKey, limit)
}

func (fn *fileNode) FilesGet(ctx context.Context, spaceId string) (fileIds []string, err error) {
storeKey, err := fn.StoreKey(ctx, spaceId, false)
if err != nil {
return
}
return fn.index.FilesList(ctx, storeKey)
}
Loading
Loading