Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-8647 - Add save do_command #6

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
FFmpeg
*.mp4
.env
*.DS_Store
*.DS_Store
bin
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ FFMPEG_OPTS ?= --prefix=$(FFMPEG_BUILD) \
--disable-doc \
--disable-everything \
--enable-static \
--enable-libx264 \
--enable-gpl \
--enable-libx264 \
--enable-gpl \
--enable-encoder=libx264 \
--enable-muxer=segment \
--enable-muxer=mp4 \
--enable-demuxer=segment \
--enable-demuxer=concat \
--enable-demuxer=mov \
--enable-demuxer=mp4 \
--enable-parser=h264 \
--enable-protocol=file \
--enable-protocol=concat \
--enable-protocol=crypto
--enable-protocol=crypto \
--enable-bsf=h264_mp4toannexb

# TODO: cleanup libx264 static link
CGO_LDFLAGS := "-L$(FFMPEG_BUILD)/lib -lavcodec -lavutil -lavformat -l:libjpeg.a /usr/lib/aarch64-linux-gnu/libx264.a -lz"
Expand Down
43 changes: 29 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Video Storage
The `video-store` module brings security camera functionality to your smart machine! The module consumes a source [Camera](https://docs.viam.com/components/camera/) and a [Vision Service](https://docs.viam.com/services/vision/), saves the camera output as video files to disk, and filters which video clips are uploaded to the cloud based on triggers from the vision service.
The `video-store` module brings security camera functionality to your smart machine! The module consumes a source [Camera](https://docs.viam.com/components/camera/) and saves the camera output as video files on disk. You can then later request to upload video slices to the cloud using the [save](#save) command, or request the video bytes directly using the [fetch](#fetch) command.

> **Note:** This component is a work in progress and is not yet fully implemented.

Expand All @@ -9,15 +9,15 @@ Fill in the attributes as applicable to the component, according to the example

```json
{
"name": "fv-cam",
"name": "video-store-1",
"namespace": "rdk",
"type": "camera",
"model": "viam:camera:video-store",
"model": "viam:video:storage",
"attributes": {
"camera": "webcam-1", // name of the camera to use
"vision": "vision-service-1", // name of the vision service dependency
"camera": "webcam-1",
"vision": "vision-service-1",
"storage": {
"clip_seconds": 30,
"segment_seconds": 30,
"size_gb": 100
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the save DoCommand have an optional directory for video storage and sync upload?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in do_command, those are optional attrs in the module - will include it in the README example.

},
"video": {
Expand All @@ -26,18 +26,14 @@ Fill in the attributes as applicable to the component, according to the example
"bitrate": 1000000,
"preset": "medium"
},
"objects": {
"Person": 0.8 // label key and threshold value
},
"cam_props": { // camera properties of the source camera
"cam_props": {
"width": 640,
"height": 480,
"framerate": 30
},
},
"depends_on": [
"webcam-1",
"vision-service-1"
]
}
```
Expand All @@ -51,12 +47,31 @@ Make sure to configure a [Data Manager Service](https://docs.viam.com/services/d
"type": "data_manager",
"attributes": {
"tags": [],
"additional_sync_paths": [
"/home/viam/.viam/video-upload/"
],
"additional_sync_paths": [],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call out that this is custom, as stated in the scope, right now there's no clue as to why this is int he example config.

"capture_disabled": true,
"sync_interval_mins": 1,
"capture_dir": ""
}
}
```

## Commands

### `save`
```json
{
"command": "save",
"from": <start_timestamp>, [required]
"to": <end_timestamp>, [required]
"metadata": <arbitrary_metadata_string> [optional]
}
```

### `fetch`
```json
{
"command": "fetch",
"from": <start_timestamp>, [required]
"to": <end_timestamp>, [required]
}
```
107 changes: 79 additions & 28 deletions cam/cam.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,43 @@ import (
)

// Model is the model for the video storage camera component.
// TODO(seanp): Personal module for now, should be movied to viam module in prod.
// TODO(seanp): Personal module for now, should be moved to viam module in prod.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's in viam-modules now.

var Model = resource.ModelNamespace("seanavery").WithFamily("video").WithModel("storage")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's okay to use the viam namespace, it's also okay to transfer a release later, just try to only upload release candidates to the registry for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, moved to viam namespace


const (
// Default values for the video storage camera component.
defaultSegmentSeconds = 30 // seconds
defaultStorageSize = 10 // GB
defaultVideoCodec = "h264"
defaultVideoBitrate = 1000000
defaultVideoPreset = "medium"
defaultVideoFormat = "mp4"
defaultLogLevel = "error"
defaultUploadPath = ".viam/video-upload"
defaultUploadPath = ".viam/capture/video-upload"
defaultStoragePath = ".viam/video-storage"

defaultLogLevel = "info"
deleterInterval = 60 // seconds
)

type videostore struct {
resource.AlwaysRebuild
resource.TriviallyCloseable
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a Close implemented, which is what I expect since we're going on and on on a loop saving video. You can delete this line, which just returns nil when you call close.

Suggested change
resource.TriviallyCloseable


name resource.Name
conf *Config
logger logging.Logger
uploadPath string
name resource.Name
conf *Config
logger logging.Logger

cam camera.Camera
stream gostream.VideoStream

workers rdkutils.StoppableWorkers
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you bump rdk, which I suggest you do you'll hae to get this from goutils goutils.NewBackgroudnWorkers is the creator function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to latest NewBackgroundWorkes in goutils.


enc *encoder
seg *segmenter
enc *encoder
seg *segmenter
conc *concater

storagePath string
uploadPath string
}

type storage struct {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add omitemptys to things that aren't required in the config's nested strucst as well.

Let's add a test for Validation and construction at this point. It can simply test 1) invalid configuration and 2) that the constructor populates the struct with defaults without testing the VideoReaders construct-to-close functionality, only do 2) if it is easy to isolate and test the code with the expected defaults.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, wrote config_test.go to verify config validation. Still need to include data manager service cases.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data manager service please do that in a follow up pr, it will be hacky and I'd rather get Fetch done first.

You will have to grab it from dependencies and validate in the constructor, Validate might not be able to get to that part of the config, it validates Attributes

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the config_test.go is sufficient for attributes, will review it, when you request again.

Expand Down Expand Up @@ -121,10 +127,10 @@ func newvideostore(
}

// TODO(seanp): make this configurable
// logLevel := lookupLogID(defaultLogLevel)
logLevel := lookupLogID("debug")
logLevel := lookupLogID(defaultLogLevel)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we can add a log level to each resource individually now, do we need this in the module? Okay to keep if it helps you debug for now.

ffmppegLogLevel(logLevel)

// Create encoder to handle encoding of frames.
// TODO(seanp): Forcing h264 for now until h265 is supported.
if newConf.Video.Codec != "h264" {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's premptively factor this into an enum.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added enum+handlers for codec type.

newConf.Video.Codec = defaultVideoCodec
Expand All @@ -138,7 +144,6 @@ func newvideostore(
if newConf.Video.Format == "" {
newConf.Video.Format = defaultVideoFormat
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the pattern of

s.thingy = defaultThingy
if newConf.Thingy != nil/empty{
    s.thingy = newConf.Thingy
}

But it might be messy here unless you make a new encoder with the logger first. You can also move this logic to the newSegmenter and newEncoder functions since you're passing in the config variables anyway.

}

vs.enc, err = newEncoder(
logger,
newConf.Video.Codec,
Expand All @@ -152,29 +157,40 @@ func newvideostore(
return nil, err
}

// Create segmenter to handle segmentation of video stream into clips.
if newConf.Storage.SegmentSeconds == 0 {
newConf.Storage.SegmentSeconds = defaultSegmentSeconds
}
if newConf.Storage.SizeGB == 0 {
newConf.Storage.SizeGB = defaultStorageSize
}
if newConf.Storage.UploadPath == "" {
newConf.Storage.UploadPath = filepath.Join(getHomeDir(), defaultUploadPath, vs.name.Name)
}
if newConf.Storage.StoragePath == "" {
newConf.Storage.StoragePath = filepath.Join(getHomeDir(), defaultStoragePath, vs.name.Name)
}
vs.seg, err = newSegmenter(logger, vs.enc, newConf.Storage.SizeGB, newConf.Storage.SegmentSeconds, newConf.Storage.StoragePath)
vs.storagePath = newConf.Storage.StoragePath
vs.seg, err = newSegmenter(
logger,
vs.enc,
newConf.Storage.SizeGB,
newConf.Storage.SegmentSeconds,
newConf.Storage.StoragePath,
)
if err != nil {
return nil, err
}

// Create concater to handle concatenation of video clips when requested.
vs.uploadPath = newConf.Storage.UploadPath
err = createDir(vs.uploadPath)
if err != nil {
return nil, err
}
vs.conc, err = newConcater(logger, vs.storagePath, vs.uploadPath, vs.name.Name)
if err != nil {
return nil, err
}

// Start workers to process frames and clean up storage.
vs.workers = rdkutils.NewStoppableWorkers(vs.processFrames, vs.deleter)

return vs, nil
Expand All @@ -185,7 +201,13 @@ func (cfg *Config) Validate(path string) ([]string, error) {
if cfg.Camera == "" {
return nil, utils.NewConfigValidationFieldRequiredError(path, "camera")
}

// Check Storage
if cfg.Storage == (storage{}) {
return nil, utils.NewConfigValidationFieldRequiredError(path, "storage")
}
if cfg.Storage.SizeGB == 0 {
return nil, utils.NewConfigValidationFieldRequiredError(path, "size_gb")
}
// TODO(seanp): Remove once camera properties are returned from camera component.
if cfg.Properties == (cameraProperties{}) {
return nil, utils.NewConfigValidationFieldRequiredError(path, "cam_props")
Expand All @@ -198,8 +220,39 @@ func (vs *videostore) Name() resource.Name {
return vs.name
}

func (vs *videostore) DoCommand(_ context.Context, _ map[string]interface{}) (map[string]interface{}, error) {
return nil, resource.ErrDoUnimplemented
// DoCommand processes the commands for the video storage camera component.
func (vs *videostore) DoCommand(_ context.Context, command map[string]interface{}) (map[string]interface{}, error) {
cmd, ok := command["command"].(string)
if !ok {
return nil, errors.New("invalid command type")
}

switch cmd {
// Save command is used to concatenate video clips between the given timestamps.
// The concatenated video file is then uploaded to the cloud the upload path.
// The response contains the name of the uploaded file.
case "save":
vs.logger.Debug("save command received")
from, to, metadata, err := validateSaveCommand(command)
if err != nil {
return nil, err
}
uploadFilePath, err := vs.conc.concat(from, to, metadata)
if err != nil {
vs.logger.Error("failed to concat files ", err)
return nil, err
}
uploadFileName := filepath.Base(uploadFilePath)
return map[string]interface{}{
"command": "save",
"file": uploadFileName,
}, nil
case "fetch":
vs.logger.Debug("fetch command received")
return nil, resource.ErrDoUnimplemented
default:
return nil, errors.New("invalid command")
}
}

func (vs *videostore) Images(_ context.Context) ([]camera.NamedImage, resource.ResponseMetadata, error) {
Expand Down Expand Up @@ -227,9 +280,7 @@ func (vs *videostore) Stream(_ context.Context, _ ...gostream.ErrorHandler) (gos
}

// processFrames reads frames from the camera, encodes, and writes to the segmenter
// which chuncks video stream into clip files inside the storage directory. This is
// meant for long term storage of video clips that are not necessarily triggered by
// detections.
// which chunks video stream into clip files inside the storage directory.
// TODO(seanp): Should this be throttled to a certain FPS?
func (vs *videostore) processFrames(ctx context.Context) {
for {
Expand Down Expand Up @@ -263,11 +314,11 @@ func (vs *videostore) processFrames(ctx context.Context) {
}
}

// deleter is a go routine that cleans up old clips if storage is full. It runs every
// minute and deletes the oldest clip until the storage size is below the max.
// deleter is a go routine that cleans up old clips if storage is full. Runs on interval
// and deletes the oldest clip until the storage size is below the configured max.
func (vs *videostore) deleter(ctx context.Context) {
// TODO(seanp): Using seconds for now, but should be minutes in prod.
ticker := time.NewTicker(60 * time.Second)
ticker := time.NewTicker(deleterInterval * time.Second)
defer ticker.Stop()
for {
select {
Expand All @@ -285,14 +336,14 @@ func (vs *videostore) deleter(ctx context.Context) {
}

// Close closes the video storage camera component.
// It closes the stream, workers, encoder, segmenter, and watcher.
func (vs *videostore) Close(ctx context.Context) error {
err := vs.stream.Close(ctx)
if err != nil {
return err
}
vs.workers.Stop()
vs.enc.Close()
vs.seg.Close()
vs.enc.close()
vs.seg.close()
vs.conc.close()
return nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see validation logic for the data manager being present in the configuration anywhere unless I missed it; that is part of the scope. However, I think that should be a separate ticket, I don't seeit mentioned in the epic's current tasks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - still need to add this. May need to amend the scope doc to include the data manager service as a mandatory dependency in the config.

}
Loading
Loading