Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8798] - Async save command #15

Merged
merged 14 commits into from
Sep 25, 2024
Merged

[RSDK-8798] - Async save command #15

merged 14 commits into from
Sep 25, 2024

Conversation

seanavery
Copy link
Collaborator

@seanavery seanavery commented Sep 23, 2024

Description

This PR adds an async option to the save do_command to allow users to fire of save commands that include the most recent video segment. Right now this simply waits the segment duration - this could be further optimized by adding a fs watcher to execute the save immediately when the segment is finished.

  • Executes async save command in asynchronous go routine that waits for segment duration before executing.
  • Refactors filename handling for concat calls.
    • Separates out concat mp4 file name generation into a helper.
    • Move to writing concat txt files to /tmp path.
    • Generates unique concat txt files with a uuid to allow for concurrent operations.

Test

  • Async save cmd ✅
  • Sync save cmd ✅
  • Fetch cmd still works ✅
  • Cleanup helper removes stale conact txt files if leftover ✅
  • Added unit test for async save cmd ✅
  • Added unit test for cleaning up leftover concat txt files ✅

@seanavery seanavery requested review from randhid and hexbabe and removed request for randhid September 23, 2024 19:20
README.md Outdated Show resolved Hide resolved
cam/cam.go Outdated
Comment on lines 276 to 277
uploadFilePath := generateOutputFilePath(vs.name.Name, formatDateTimeToString(from), metadata, vs.uploadPath)
uploadFileName := filepath.Base(uploadFilePath)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with a bit of a refactor to have vs embed a resource.Named interface and passing it conf.Name.AsNamed() in the constructor/reconfiguration, you could shorten getting the name to a vs.Name call. Also, test these two functions to make sure they're giving you what you expect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call out. Looks like conf.ResourceName().AsNamed() returns Named, but I really just care about the string value.

Shouldn't I just pull out the string with conf.ResourceName().Name directly into the struct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To satisfy the Camera interface, it looks like I need a method on vs Name that returns resource.Name

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can pull it out with .ShortName I think, but if it's not following what I think other resource interfaces follow, then we can just leave it be.

cam/concater.go Show resolved Hide resolved
Comment on lines +278 to +306
func validateSaveCommand(command map[string]interface{}) (time.Time, time.Time, string, bool, error) {
fromStr, ok := command["from"].(string)
if !ok {
return time.Time{}, time.Time{}, "", errors.New("from timestamp not found")
return time.Time{}, time.Time{}, "", false, errors.New("from timestamp not found")
}
from, err := parseDateTimeString(fromStr)
if err != nil {
return time.Time{}, time.Time{}, "", err
return time.Time{}, time.Time{}, "", false, err
}
toStr, ok := command["to"].(string)
if !ok {
return time.Time{}, time.Time{}, "", errors.New("to timestamp not found")
return time.Time{}, time.Time{}, "", false, errors.New("to timestamp not found")
}
to, err := parseDateTimeString(toStr)
if err != nil {
return time.Time{}, time.Time{}, "", err
return time.Time{}, time.Time{}, "", false, err
}
if from.After(to) {
return time.Time{}, time.Time{}, "", errors.New("from timestamp is after to timestamp")
return time.Time{}, time.Time{}, "", false, errors.New("from timestamp is after to timestamp")
}
metadata, ok := command["metadata"].(string)
if !ok {
metadata = ""
}
return from, to, metadata, nil
async, ok := command["async"].(bool)
if !ok {
async = false
}
return from, to, metadata, async, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

teh returns here are getting pretty long, that's okay, but you may want to break this up into separate validation functions if you need more in the future or return a struct or use a function in the scope of DoCommand. Nothing to do now.

tests/save_test.go Show resolved Hide resolved
cam/cam.go Outdated Show resolved Hide resolved
cam/cam.go Outdated Show resolved Hide resolved
tests/save_test.go Show resolved Hide resolved
Copy link
Collaborator

@hexbabe hexbabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly afraid that long segment durations could cause gRPC timeouts. Have we tested long segment durations with the various SDKs?

cam/cam.go Outdated Show resolved Hide resolved
cam/cam.go Outdated Show resolved Hide resolved
@@ -112,6 +112,15 @@ func TestSaveDoCommand(t *testing.T) {
"metadata": "test-metadata",
}

// Valid async save
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super optional: but looking ahead, we could probably add a test now that tests a time range that overlaps with the current time/the segment currently being written to.

Prob an important test to test the future feature of not waiting the entire segment duration every time.

Copy link
Collaborator Author

@seanavery seanavery Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I should add such a test. Running into issue where viam-server crashes when waiting for the async call in the test run - will try to get it working tmrw.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazin

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I cannot seem to get around an issue where my module crashes when sleeping in a test case. Not sure at this point why waiting in a test is interfering with the robot runtime.

\_ Assertion ((src_linesize) >= 0 ? (src_linesize) : (-(src_linesize))) >= bytewidth failed at libavutil/imgutils.c:350
2024-09-25T16:27:04.727Z	ERROR	video-store-module.modmanager.process.video-storage_/host/bin/video-store.StdErr	pexec/managed_process.go:275	
\_ SIGABRT: abort

I have tried a few strategies including throwing sleep in a routine with async wait group with no success.

Would you be ok if I throw this into the test improvements ticket?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that sounds good to me. Not blocking

Comment on lines +282 to +283
vs.workers.Add(func(ctx context.Context) {
vs.asyncSave(ctx, from, to, uploadFilePath)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things to consider:
1 - We add a worker with each DoCommand request, do we want them to interrupt already working workers?
2 - Should another call interrupt this worker?
3 - workers.Stop will stop all workers so if you want them to be separate from your other workers, give this new set of workers a new variable in the struct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 - We add a worker with each DoCommand request, do we want them to interrupt already working workers?

I do not want the do_command workers to interfere with the other long-running workers (processFrames, deleter). Adding workers should not interfere:

type StoppableWorkers struct {
	// Use a `sync.RWMutex` isntead of a `sync.Mutex` so that additions of new
	// workers do not lock with each other in any way. We want
	// as-fast-as-possible worker addition.
	mu         sync.RWMutex
	ctx        context.Context
	cancelFunc func()

	workers sync.WaitGroup
}

2 - Should another call interrupt this worker?

The short-lived asynSave routine should just die out when finished through the cancel call.

  1. workers.Stop will stop all workers so if you want them to be separate from your other workers, give this new set of workers a new variable in the struct.

It is ok to stop all workers during close - processFrames, deleter, and all outstanding async calls can be shutdown together.

@seanavery seanavery merged commit b9d2829 into main Sep 25, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants