Skip to content

Commit

Permalink
Merge pull request #826 from sputn1ck/generalized_notif_stream
Browse files Browse the repository at this point in the history
Generalized notif stream
  • Loading branch information
sputn1ck authored Oct 8, 2024
2 parents 61c0b45 + 7409ae7 commit 8bc6b61
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 146 deletions.
18 changes: 9 additions & 9 deletions instantout/reservation/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,24 @@ type mockReservationClient struct {
mock.Mock
}

func (m *mockReservationClient) OpenReservation(ctx context.Context,
in *swapserverrpc.ServerOpenReservationRequest,
opts ...grpc.CallOption) (*swapserverrpc.ServerOpenReservationResponse,
func (m *mockReservationClient) ReservationNotificationStream(
ctx context.Context, in *swapserverrpc.ReservationNotificationRequest,
opts ...grpc.CallOption,
) (swapserverrpc.ReservationService_ReservationNotificationStreamClient,
error) {

args := m.Called(ctx, in, opts)
return args.Get(0).(*swapserverrpc.ServerOpenReservationResponse),
return args.Get(0).(swapserverrpc.ReservationService_ReservationNotificationStreamClient),
args.Error(1)
}

func (m *mockReservationClient) ReservationNotificationStream(
ctx context.Context, in *swapserverrpc.ReservationNotificationRequest,
opts ...grpc.CallOption,
) (swapserverrpc.ReservationService_ReservationNotificationStreamClient,
func (m *mockReservationClient) OpenReservation(ctx context.Context,
in *swapserverrpc.ServerOpenReservationRequest,
opts ...grpc.CallOption) (*swapserverrpc.ServerOpenReservationResponse,
error) {

args := m.Called(ctx, in, opts)
return args.Get(0).(swapserverrpc.ReservationService_ReservationNotificationStreamClient),
return args.Get(0).(*swapserverrpc.ServerOpenReservationResponse),
args.Error(1)
}

Expand Down
5 changes: 3 additions & 2 deletions instantout/reservation/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type Config struct {
// swap server.
ReservationClient swapserverrpc.ReservationServiceClient

// FetchL402 is the function used to fetch the l402 token.
FetchL402 func(context.Context) error
// NotificationManager is the manager that handles the notification
// subscriptions.
NotificationManager NotificationManager
}

// FSM is the state machine that manages the reservation lifecycle.
Expand Down
9 changes: 9 additions & 0 deletions instantout/reservation/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package reservation
import (
"context"
"fmt"

"github.com/lightninglabs/loop/swapserverrpc"
)

var (
Expand Down Expand Up @@ -31,3 +33,10 @@ type Store interface {
// made.
ListReservations(ctx context.Context) ([]*Reservation, error)
}

// NotificationManager handles subscribing to incoming reservation
// subscriptions.
type NotificationManager interface {
SubscribeReservations(context.Context,
) <-chan *swapserverrpc.ServerReservationNotification
}
109 changes: 2 additions & 107 deletions instantout/reservation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type Manager struct {
// activeReservations contains all the active reservationsFSMs.
activeReservations map[ID]*FSM

// hasL402 is true if the client has a valid L402.
hasL402 bool

runCtx context.Context

sync.Mutex
Expand Down Expand Up @@ -59,22 +56,15 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
return err
}

reservationResChan := make(
chan *reservationrpc.ServerReservationNotification,
)

err = m.RegisterReservationNotifications(reservationResChan)
if err != nil {
return err
}
ntfnChan := m.cfg.NotificationManager.SubscribeReservations(ctx)

for {
select {
case height := <-newBlockChan:
log.Debugf("Received block %v", height)
currentHeight = height

case reservationRes := <-reservationResChan:
case reservationRes := <-ntfnChan:
log.Debugf("Received reservation %x",
reservationRes.ReservationId)
_, err := m.newReservation(
Expand Down Expand Up @@ -157,101 +147,6 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
return reservationFSM, nil
}

// fetchL402 fetches the L402 from the server. This method will keep on
// retrying until it gets a valid response.
func (m *Manager) fetchL402(ctx context.Context) {
// Add a 0 timer so that we initially fetch the L402 immediately.
timer := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return

case <-timer.C:
err := m.cfg.FetchL402(ctx)
if err != nil {
log.Warnf("Error fetching L402: %v", err)
timer.Reset(time.Second * 10)
continue
}
m.hasL402 = true
return
}
}
}

// RegisterReservationNotifications registers a new reservation notification
// stream.
func (m *Manager) RegisterReservationNotifications(
reservationChan chan *reservationrpc.ServerReservationNotification) error {

// In order to create a valid l402 we first are going to call
// the FetchL402 method. As a client might not have outbound capacity
// yet, we'll retry until we get a valid response.
if !m.hasL402 {
m.fetchL402(m.runCtx)
}

ctx, cancel := context.WithCancel(m.runCtx)

// We'll now subscribe to the reservation notifications.
reservationStream, err := m.cfg.ReservationClient.
ReservationNotificationStream(
ctx, &reservationrpc.ReservationNotificationRequest{},
)
if err != nil {
cancel()
return err
}

log.Debugf("Successfully subscribed to reservation notifications")

// We'll now start a goroutine that will forward all the reservation
// notifications to the reservationChan.
go func() {
for {
reservationRes, err := reservationStream.Recv()
if err == nil && reservationRes != nil {
log.Debugf("Received reservation %x",
reservationRes.ReservationId)
reservationChan <- reservationRes
continue
}
log.Errorf("Error receiving "+
"reservation: %v", err)

cancel()

// If we encounter an error, we'll
// try to reconnect.
for {
select {
case <-m.runCtx.Done():
return

case <-time.After(time.Second * 10):
log.Debugf("Reconnecting to " +
"reservation notifications")
err = m.RegisterReservationNotifications(
reservationChan,
)
if err != nil {
log.Errorf("Error "+
"reconnecting: %v", err)
continue
}

// If we were able to reconnect, we'll
// return.
return
}
}
}
}()

return nil
}

// RecoverReservations tries to recover all reservations that are still active
// from the database.
func (m *Manager) RecoverReservations(ctx context.Context) error {
Expand Down
38 changes: 15 additions & 23 deletions instantout/reservation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

var (
Expand Down Expand Up @@ -118,27 +117,22 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {

sendChan := make(chan *swapserverrpc.ServerReservationNotification)

mockReservationClient.On(
"ReservationNotificationStream", mock.Anything, mock.Anything,
mock.Anything,
).Return(
&dummyReservationNotificationServer{
SendChan: sendChan,
}, nil,
)

mockReservationClient.On(
"OpenReservation", mock.Anything, mock.Anything, mock.Anything,
).Return(
&swapserverrpc.ServerOpenReservationResponse{}, nil,
)

mockNtfnManager := &mockNtfnManager{
sendChan: sendChan,
}

cfg := &Config{
Store: store,
Wallet: mockLnd.WalletKit,
ChainNotifier: mockLnd.ChainNotifier,
FetchL402: func(context.Context) error { return nil },
ReservationClient: mockReservationClient,
Store: store,
Wallet: mockLnd.WalletKit,
ChainNotifier: mockLnd.ChainNotifier,
ReservationClient: mockReservationClient,
NotificationManager: mockNtfnManager,
}

manager := NewManager(cfg)
Expand All @@ -152,17 +146,15 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
}
}

type dummyReservationNotificationServer struct {
grpc.ClientStream

// SendChan is the channel that is used to send notifications.
SendChan chan *swapserverrpc.ServerReservationNotification
type mockNtfnManager struct {
sendChan chan *swapserverrpc.ServerReservationNotification
}

func (d *dummyReservationNotificationServer) Recv() (
*swapserverrpc.ServerReservationNotification, error) {
func (m *mockNtfnManager) SubscribeReservations(
ctx context.Context,
) <-chan *swapserverrpc.ServerReservationNotification {

return <-d.SendChan, nil
return m.sendChan
}

func mustDecodeID(id string) ID {
Expand Down
31 changes: 26 additions & 5 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb"
loop_looprpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/notifications"
loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd/clock"
Expand Down Expand Up @@ -501,21 +502,41 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}
}

// Start the notification manager.
notificationCfg := &notifications.Config{
Client: loop_swaprpc.NewSwapServerClient(swapClient.Conn),
FetchL402: swapClient.Server.FetchL402,
}
notificationManager := notifications.NewManager(notificationCfg)

d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Info("Starting notification manager")
err := notificationManager.Run(d.mainCtx)
if err != nil {
d.internalErrChan <- err
log.Errorf("Notification manager stopped: %v", err)
}
}()

var (
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
)

// Create the reservation and instantout managers.
if d.cfg.EnableExperimental {
reservationStore := reservation.NewSQLStore(
loopdb.NewTypedStore[reservation.Querier](baseDb),
)
reservationConfig := &reservation.Config{
Store: reservationStore,
Wallet: d.lnd.WalletKit,
ChainNotifier: d.lnd.ChainNotifier,
ReservationClient: reservationClient,
FetchL402: swapClient.Server.FetchL402,
Store: reservationStore,
Wallet: d.lnd.WalletKit,
ChainNotifier: d.lnd.ChainNotifier,
ReservationClient: reservationClient,
NotificationManager: notificationManager,
}

reservationManager = reservation.NewManager(
Expand Down
4 changes: 4 additions & 0 deletions loopd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/notifications"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd"
"github.com/lightningnetwork/lnd/build"
Expand Down Expand Up @@ -48,6 +49,9 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) {
lnd.AddSubLogger(
root, instantout.Subsystem, intercept, instantout.UseLogger,
)
lnd.AddSubLogger(
root, notifications.Subsystem, intercept, notifications.UseLogger,
)
}

// genSubLogger creates a logger for a subsystem. We provide an instance of
Expand Down
26 changes: 26 additions & 0 deletions notifications/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package notifications

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// Subsystem defines the sub system name of this package.
const Subsystem = "NTFNS"

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
Loading

0 comments on commit 8bc6b61

Please sign in to comment.