diff --git a/.gitignore b/.gitignore index 2a4316e..eb4e57d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ FFmpeg *.mp4 .env -*.DS_Store \ No newline at end of file +*.DS_Store +bin diff --git a/Makefile b/Makefile index 32959f9..0b020a7 100644 --- a/Makefile +++ b/Makefile @@ -19,15 +19,20 @@ FFMPEG_OPTS ?= --prefix=$(FFMPEG_BUILD) \ --disable-doc \ --disable-everything \ --enable-static \ - --enable-libx264 \ - --enable-gpl \ + --enable-libx264 \ + --enable-gpl \ --enable-encoder=libx264 \ --enable-muxer=segment \ --enable-muxer=mp4 \ --enable-demuxer=segment \ + --enable-demuxer=concat \ + --enable-demuxer=mov \ + --enable-demuxer=mp4 \ + --enable-parser=h264 \ --enable-protocol=file \ --enable-protocol=concat \ - --enable-protocol=crypto + --enable-protocol=crypto \ + --enable-bsf=h264_mp4toannexb # TODO: cleanup libx264 static link CGO_LDFLAGS := "-L$(FFMPEG_BUILD)/lib -lavcodec -lavutil -lavformat -l:libjpeg.a /usr/lib/aarch64-linux-gnu/libx264.a -lz" diff --git a/README.md b/README.md index e7c2a87..b93b99d 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Video Storage -The `video-store` module brings security camera functionality to your smart machine! The module consumes a source [Camera](https://docs.viam.com/components/camera/) and a [Vision Service](https://docs.viam.com/services/vision/), saves the camera output as video files to disk, and filters which video clips are uploaded to the cloud based on triggers from the vision service. +The `video-store` module brings security camera functionality to your smart machine! The module consumes a source [Camera](https://docs.viam.com/components/camera/) and saves the camera output as video files on disk. You can then later request to upload video slices to the cloud using the [save](#save) command, or request the video bytes directly using the [fetch](#fetch) command. > **Note:** This component is a work in progress and is not yet fully implemented. @@ -9,15 +9,15 @@ Fill in the attributes as applicable to the component, according to the example ```json { - "name": "fv-cam", + "name": "video-store-1", "namespace": "rdk", "type": "camera", - "model": "viam:camera:video-store", + "model": "viam:video:storage", "attributes": { - "camera": "webcam-1", // name of the camera to use - "vision": "vision-service-1", // name of the vision service dependency + "camera": "webcam-1", + "vision": "vision-service-1", "storage": { - "clip_seconds": 30, + "segment_seconds": 30, "size_gb": 100 }, "video": { @@ -26,10 +26,7 @@ Fill in the attributes as applicable to the component, according to the example "bitrate": 1000000, "preset": "medium" }, - "objects": { - "Person": 0.8 // label key and threshold value - }, - "cam_props": { // camera properties of the source camera + "cam_props": { "width": 640, "height": 480, "framerate": 30 @@ -37,7 +34,6 @@ Fill in the attributes as applicable to the component, according to the example }, "depends_on": [ "webcam-1", - "vision-service-1" ] } ``` @@ -51,12 +47,31 @@ Make sure to configure a [Data Manager Service](https://docs.viam.com/services/d "type": "data_manager", "attributes": { "tags": [], - "additional_sync_paths": [ - "/home/viam/.viam/video-upload/" - ], + "additional_sync_paths": [], "capture_disabled": true, "sync_interval_mins": 1, "capture_dir": "" } } ``` + +## Commands + +### `save` +```json +{ + "command": "save", + "from": , [required] + "to": , [required] + "metadata": [optional] +} +``` + +### `fetch` +```json +{ + "command": "fetch", + "from": , [required] + "to": , [required] +} +``` diff --git a/cam/cam.go b/cam/cam.go index 411ad60..d977f62 100644 --- a/cam/cam.go +++ b/cam/cam.go @@ -19,37 +19,43 @@ import ( ) // Model is the model for the video storage camera component. -// TODO(seanp): Personal module for now, should be movied to viam module in prod. +// TODO(seanp): Personal module for now, should be moved to viam module in prod. var Model = resource.ModelNamespace("seanavery").WithFamily("video").WithModel("storage") const ( + // Default values for the video storage camera component. defaultSegmentSeconds = 30 // seconds defaultStorageSize = 10 // GB defaultVideoCodec = "h264" defaultVideoBitrate = 1000000 defaultVideoPreset = "medium" defaultVideoFormat = "mp4" - defaultLogLevel = "error" - defaultUploadPath = ".viam/video-upload" + defaultUploadPath = ".viam/capture/video-upload" defaultStoragePath = ".viam/video-storage" + + defaultLogLevel = "info" + deleterInterval = 60 // seconds ) type videostore struct { resource.AlwaysRebuild resource.TriviallyCloseable - name resource.Name - conf *Config - logger logging.Logger - uploadPath string + name resource.Name + conf *Config + logger logging.Logger cam camera.Camera stream gostream.VideoStream workers rdkutils.StoppableWorkers - enc *encoder - seg *segmenter + enc *encoder + seg *segmenter + conc *concater + + storagePath string + uploadPath string } type storage struct { @@ -121,10 +127,10 @@ func newvideostore( } // TODO(seanp): make this configurable - // logLevel := lookupLogID(defaultLogLevel) - logLevel := lookupLogID("debug") + logLevel := lookupLogID(defaultLogLevel) ffmppegLogLevel(logLevel) + // Create encoder to handle encoding of frames. // TODO(seanp): Forcing h264 for now until h265 is supported. if newConf.Video.Codec != "h264" { newConf.Video.Codec = defaultVideoCodec @@ -138,7 +144,6 @@ func newvideostore( if newConf.Video.Format == "" { newConf.Video.Format = defaultVideoFormat } - vs.enc, err = newEncoder( logger, newConf.Video.Codec, @@ -152,29 +157,40 @@ func newvideostore( return nil, err } + // Create segmenter to handle segmentation of video stream into clips. if newConf.Storage.SegmentSeconds == 0 { newConf.Storage.SegmentSeconds = defaultSegmentSeconds } - if newConf.Storage.SizeGB == 0 { - newConf.Storage.SizeGB = defaultStorageSize - } if newConf.Storage.UploadPath == "" { newConf.Storage.UploadPath = filepath.Join(getHomeDir(), defaultUploadPath, vs.name.Name) } if newConf.Storage.StoragePath == "" { newConf.Storage.StoragePath = filepath.Join(getHomeDir(), defaultStoragePath, vs.name.Name) } - vs.seg, err = newSegmenter(logger, vs.enc, newConf.Storage.SizeGB, newConf.Storage.SegmentSeconds, newConf.Storage.StoragePath) + vs.storagePath = newConf.Storage.StoragePath + vs.seg, err = newSegmenter( + logger, + vs.enc, + newConf.Storage.SizeGB, + newConf.Storage.SegmentSeconds, + newConf.Storage.StoragePath, + ) if err != nil { return nil, err } + // Create concater to handle concatenation of video clips when requested. vs.uploadPath = newConf.Storage.UploadPath err = createDir(vs.uploadPath) if err != nil { return nil, err } + vs.conc, err = newConcater(logger, vs.storagePath, vs.uploadPath, vs.name.Name) + if err != nil { + return nil, err + } + // Start workers to process frames and clean up storage. vs.workers = rdkutils.NewStoppableWorkers(vs.processFrames, vs.deleter) return vs, nil @@ -185,7 +201,13 @@ func (cfg *Config) Validate(path string) ([]string, error) { if cfg.Camera == "" { return nil, utils.NewConfigValidationFieldRequiredError(path, "camera") } - + // Check Storage + if cfg.Storage == (storage{}) { + return nil, utils.NewConfigValidationFieldRequiredError(path, "storage") + } + if cfg.Storage.SizeGB == 0 { + return nil, utils.NewConfigValidationFieldRequiredError(path, "size_gb") + } // TODO(seanp): Remove once camera properties are returned from camera component. if cfg.Properties == (cameraProperties{}) { return nil, utils.NewConfigValidationFieldRequiredError(path, "cam_props") @@ -198,8 +220,39 @@ func (vs *videostore) Name() resource.Name { return vs.name } -func (vs *videostore) DoCommand(_ context.Context, _ map[string]interface{}) (map[string]interface{}, error) { - return nil, resource.ErrDoUnimplemented +// DoCommand processes the commands for the video storage camera component. +func (vs *videostore) DoCommand(_ context.Context, command map[string]interface{}) (map[string]interface{}, error) { + cmd, ok := command["command"].(string) + if !ok { + return nil, errors.New("invalid command type") + } + + switch cmd { + // Save command is used to concatenate video clips between the given timestamps. + // The concatenated video file is then uploaded to the cloud the upload path. + // The response contains the name of the uploaded file. + case "save": + vs.logger.Debug("save command received") + from, to, metadata, err := validateSaveCommand(command) + if err != nil { + return nil, err + } + uploadFilePath, err := vs.conc.concat(from, to, metadata) + if err != nil { + vs.logger.Error("failed to concat files ", err) + return nil, err + } + uploadFileName := filepath.Base(uploadFilePath) + return map[string]interface{}{ + "command": "save", + "file": uploadFileName, + }, nil + case "fetch": + vs.logger.Debug("fetch command received") + return nil, resource.ErrDoUnimplemented + default: + return nil, errors.New("invalid command") + } } func (vs *videostore) Images(_ context.Context) ([]camera.NamedImage, resource.ResponseMetadata, error) { @@ -227,9 +280,7 @@ func (vs *videostore) Stream(_ context.Context, _ ...gostream.ErrorHandler) (gos } // processFrames reads frames from the camera, encodes, and writes to the segmenter -// which chuncks video stream into clip files inside the storage directory. This is -// meant for long term storage of video clips that are not necessarily triggered by -// detections. +// which chunks video stream into clip files inside the storage directory. // TODO(seanp): Should this be throttled to a certain FPS? func (vs *videostore) processFrames(ctx context.Context) { for { @@ -263,11 +314,11 @@ func (vs *videostore) processFrames(ctx context.Context) { } } -// deleter is a go routine that cleans up old clips if storage is full. It runs every -// minute and deletes the oldest clip until the storage size is below the max. +// deleter is a go routine that cleans up old clips if storage is full. Runs on interval +// and deletes the oldest clip until the storage size is below the configured max. func (vs *videostore) deleter(ctx context.Context) { // TODO(seanp): Using seconds for now, but should be minutes in prod. - ticker := time.NewTicker(60 * time.Second) + ticker := time.NewTicker(deleterInterval * time.Second) defer ticker.Stop() for { select { @@ -285,14 +336,14 @@ func (vs *videostore) deleter(ctx context.Context) { } // Close closes the video storage camera component. -// It closes the stream, workers, encoder, segmenter, and watcher. func (vs *videostore) Close(ctx context.Context) error { err := vs.stream.Close(ctx) if err != nil { return err } vs.workers.Stop() - vs.enc.Close() - vs.seg.Close() + vs.enc.close() + vs.seg.close() + vs.conc.close() return nil } diff --git a/cam/concater.go b/cam/concater.go new file mode 100644 index 0000000..0e0bbd8 --- /dev/null +++ b/cam/concater.go @@ -0,0 +1,210 @@ +package videostore + +/* +#include +#include +#include +*/ +import "C" + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "time" + "unsafe" + + "go.viam.com/rdk/logging" +) + +const ( + conactTextFileName = "concat.txt" +) + +type concater struct { + logger logging.Logger + storagePath string + uploadPath string + camName string + concatFile *os.File +} + +func newConcater( + logger logging.Logger, + storagePath, uploadPath, camName string, +) (*concater, error) { + concatPath := filepath.Join(getHomeDir(), ".viam", conactTextFileName) + logger.Debugf("concatPath: %s", concatPath) + concatFile, err := os.Create(concatPath) + if err != nil { + logger.Error("failed to create concat file", err) + return nil, err + } + return &concater{ + logger: logger, + storagePath: storagePath, + uploadPath: uploadPath, + concatFile: concatFile, + camName: camName, + }, nil +} + +// concat takes in from and to timestamps and concates the video files between them. +// returns the path to the concated video file. +func (c *concater) concat(from, to time.Time, metadata string) (string, error) { + // Find the storage files that match the concat query. + storageFiles, err := getSortedFiles(c.storagePath) + if err != nil { + c.logger.Error("failed to get sorted files", err) + return "", err + } + if len(storageFiles) == 0 { + return "", errors.New("no video data in storage") + } + err = validateTimeRange(storageFiles, from, to) + if err != nil { + return "", err + } + matchingFiles := matchStorageToRange(storageFiles, from, to) + if len(matchingFiles) == 0 { + return "", errors.New("no matching video data to save") + } + + // Clear the concat file and write the matching files list to it. + c.concatFile.Truncate(0) + c.concatFile.Seek(0, 0) + for _, file := range matchingFiles { + _, err := c.concatFile.WriteString(fmt.Sprintf("file '%s'\n", file)) + if err != nil { + return "", err + } + } + + concatFilePath := C.CString(c.concatFile.Name()) + defer C.free(unsafe.Pointer(concatFilePath)) + concatStr := C.CString("concat") + defer C.free(unsafe.Pointer(concatStr)) + inputFormat := C.av_find_input_format(concatStr) + if inputFormat == nil { + return "", errors.New("failed to find input format") + } + + // Open the input format context with the concat demuxer. This block sets up + // the input format context to read the concatenated input files. It uses the + // concat demuxer with the 'safe' option set to '0' to allow absolute paths in + // the input file list. + var options *C.AVDictionary + safeStr := C.CString("safe") + safeValStr := C.CString("0") + defer C.free(unsafe.Pointer(safeValStr)) + defer C.free(unsafe.Pointer(safeStr)) + defer C.av_dict_free(&options) + C.av_dict_set(&options, safeStr, safeValStr, 0) + var inputCtx *C.AVFormatContext + ret := C.avformat_open_input(&inputCtx, concatFilePath, inputFormat, &options) + if ret < 0 { + return "", fmt.Errorf("failed to open input format: %s", ffmpegError(ret)) + } + ret = C.avformat_find_stream_info(inputCtx, nil) + if ret < 0 { + return "", fmt.Errorf("failed to find stream info: %s", ffmpegError(ret)) + } + + // Open the output format context and write the header. This block sets up the + // output format context to write the concatenated video data to a new file. + var outputFilename string + fromStr := formatDateTimeToString(from) + if metadata == "" { + outputFilename = fmt.Sprintf("%s_%s.%s", c.camName, fromStr, defaultVideoFormat) + } else { + outputFilename = fmt.Sprintf("%s_%s_%s.%s", c.camName, fromStr, metadata, defaultVideoFormat) + } + outputPath := filepath.Join(c.uploadPath, outputFilename) + c.logger.Debug("outputPath", outputPath) + outputPathCStr := C.CString(outputPath) + defer C.free(unsafe.Pointer(outputPathCStr)) + var outputCtx *C.AVFormatContext + ret = C.avformat_alloc_output_context2(&outputCtx, nil, nil, outputPathCStr) + if ret < 0 { + return "", fmt.Errorf("failed to allocate output context: %s", ffmpegError(ret)) + } + + // Copy codec info from input to output context. This is necessary to ensure + // we do not decode and re-encode the video data. + for i := 0; i < int(inputCtx.nb_streams); i++ { + inStream := *(**C.AVStream)( + unsafe.Pointer(uintptr(unsafe.Pointer(inputCtx.streams)) + + uintptr(i)*unsafe.Sizeof(inputCtx.streams))) + outStream := C.avformat_new_stream(outputCtx, nil) + if outStream == nil { + return "", fmt.Errorf("failed to allocate stream") + } + ret := C.avcodec_parameters_copy(outStream.codecpar, inStream.codecpar) + if ret < 0 { + return "", fmt.Errorf("failed to copy codec parameters: %s", ffmpegError(ret)) + } + // Let ffmpeg handle the codec tag for us. + outStream.codecpar.codec_tag = 0 + } + + // Open the output file and write the header. + ret = C.avio_open(&outputCtx.pb, outputPathCStr, C.AVIO_FLAG_WRITE) + if ret < 0 { + return "", fmt.Errorf("failed to open output file: %s", ffmpegError(ret)) + } + ret = C.avformat_write_header(outputCtx, nil) + if ret < 0 { + return "", fmt.Errorf("failed to write header: %s", ffmpegError(ret)) + } + + // Iterate through each packet in the input context and write it to the output context. + // TODO(seanp): We can hopefully optimize this by copying input segments entirely instead of packet by packet. + packet := C.av_packet_alloc() + for { + ret := C.av_read_frame(inputCtx, packet) + if ret == C.AVERROR_EOF { + c.logger.Debug("Concatenation complete. Hit EOF.") + break + } + // Any error other than EOF is a problem. + if ret < 0 { + return "", fmt.Errorf("failed to read frame: %s", ffmpegError(ret)) + } + // Adjust the PTS, DTS, and duration correctly for each packet. + // Can have multiple streams, so need to adjust each packet based on the + // timebase of the stream the packet belongs to. + inStream := *(**C.AVStream)(unsafe.Pointer( + uintptr(unsafe.Pointer(inputCtx.streams)) + + uintptr(packet.stream_index)*unsafe.Sizeof(uintptr(0)))) + outStream := *(**C.AVStream)(unsafe.Pointer( + uintptr(unsafe.Pointer(outputCtx.streams)) + + uintptr(packet.stream_index)*unsafe.Sizeof(uintptr(0)))) + packet.pts = C.av_rescale_q_rnd(packet.pts, inStream.time_base, outStream.time_base, C.AV_ROUND_NEAR_INF|C.AV_ROUND_PASS_MINMAX) + packet.dts = C.av_rescale_q_rnd(packet.dts, inStream.time_base, outStream.time_base, C.AV_ROUND_NEAR_INF|C.AV_ROUND_PASS_MINMAX) + packet.duration = C.av_rescale_q(packet.duration, inStream.time_base, outStream.time_base) + packet.pos = -1 + ret = C.av_interleaved_write_frame(outputCtx, packet) + if ret < 0 { + return "", fmt.Errorf("failed to write frame: %s", ffmpegError(ret)) + } + } + + // Write the trailer, close the output file, and free context memory. + ret = C.av_write_trailer(outputCtx) + if ret < 0 { + return "", fmt.Errorf("failed to write trailer: %s", ffmpegError(ret)) + } + // FFmpeg methods handle null pointers, so no need to check for nil. + C.avio_closep(&outputCtx.pb) + C.avformat_close_input(&inputCtx) + C.avformat_free_context(outputCtx) + C.av_packet_free(&packet) + + return outputPath, nil +} + +func (c *concater) close() { + c.concatFile.Close() + os.Remove(c.concatFile.Name()) +} diff --git a/cam/encoder.go b/cam/encoder.go index 26a564a..b5bb818 100644 --- a/cam/encoder.go +++ b/cam/encoder.go @@ -158,7 +158,7 @@ func (e *encoder) encode(frame image.Image) ([]byte, int64, int64, error) { return encodedData, pts, dts, nil } -func (e *encoder) Close() { +func (e *encoder) close() { C.avcodec_close(e.codecCtx) C.av_frame_free(&e.srcFrame) C.avcodec_free_context(&e.codecCtx) diff --git a/cam/segmenter.go b/cam/segmenter.go index 5f7481b..70f9c07 100644 --- a/cam/segmenter.go +++ b/cam/segmenter.go @@ -187,7 +187,7 @@ func (s *segmenter) cleanupStorage() error { // Close closes the segmenter and writes the trailer to prevent corruption // when exiting early in the middle of a segment. -func (s *segmenter) Close() { +func (s *segmenter) close() { ret := C.av_write_trailer(s.outCtx) if ret < 0 { s.logger.Errorf("failed to write trailer", "error", ffmpegError(ret)) diff --git a/cam/utils.go b/cam/utils.go index 6b8d7e0..d75864d 100644 --- a/cam/utils.go +++ b/cam/utils.go @@ -4,10 +4,13 @@ package videostore #include #include #include +#include +#include */ import "C" import ( + "errors" "fmt" "io" "os" @@ -22,7 +25,7 @@ func ffmpegError(ret C.int) string { var errbuf [256]C.char C.av_strerror(ret, &errbuf[0], 256) if errbuf[0] == 0 { - return "unknown error" + return "unknown ffmpeg error" } return C.GoString(&errbuf[0]) } @@ -117,8 +120,8 @@ func getSortedFiles(path string) ([]string, error) { filePaths = append(filePaths, filepath.Join(path, file.Name())) } sort.Slice(filePaths, func(i, j int) bool { - timeI, errI := extractDateTime(filePaths[i]) - timeJ, errJ := extractDateTime(filePaths[j]) + timeI, errI := extractDateTimeFromFilename(filePaths[i]) + timeJ, errJ := extractDateTimeFromFilename(filePaths[j]) if errI != nil || errJ != nil { return false } @@ -128,23 +131,50 @@ func getSortedFiles(path string) ([]string, error) { return filePaths, nil } -// extractDateTime extracts the date and time from the file name. -func extractDateTime(filePath string) (time.Time, error) { +// extractDateTimeFromFilename extracts the date and time from the filename. +func extractDateTimeFromFilename(filePath string) (time.Time, error) { baseName := filepath.Base(filePath) parts := strings.Split(baseName, "_") if len(parts) < 2 { return time.Time{}, fmt.Errorf("invalid file name: %s", baseName) } - datePart := parts[1] - timePart := strings.TrimSuffix(parts[1], filepath.Ext(baseName)) + datePart := parts[0] + timePart := strings.TrimSuffix(parts[1], filepath.Ext(parts[1])) dateTimeStr := datePart + "_" + timePart - dateTime, err := time.Parse("2006-01-02_15-04-05", dateTimeStr) + return parseDateTimeString(dateTimeStr) +} + +// parseDateTimeString parses a date and time string in the format "2006-01-02_15-04-05". +// Returns a time.Time object and an error if the string is not in the correct format. +func parseDateTimeString(datetime string) (time.Time, error) { + dateTime, err := time.Parse("2006-01-02_15-04-05", datetime) if err != nil { return time.Time{}, err } return dateTime, nil } +func formatDateTimeToString(dateTime time.Time) string { + return dateTime.Format("2006-01-02_15-04-05") +} + +// matchStorageToRange returns a list of files that fall within the provided time range. +// TODO(seanp): This should also include trimming offsets to the file list. +func matchStorageToRange(files []string, start, end time.Time) []string { + var matchedFiles []string + for _, file := range files { + dateTime, err := extractDateTimeFromFilename(file) + if err != nil { + continue + } + // !dateTime.Before(start) allows us to include the start time in the range inclusively. + if !dateTime.Before(start) && dateTime.Before(end) { + matchedFiles = append(matchedFiles, file) + } + } + return matchedFiles +} + // copyFile copies a file from the source to the destination. func copyFile(src, dst string) error { sourceFile, err := os.Open(src) @@ -168,11 +198,52 @@ func copyFile(src, dst string) error { return nil } -// fetchCompName -func fetchCompName(resourceName string) string { - parts := strings.Split(resourceName, "/") - if len(parts) > 0 { - return parts[len(parts)-1] +// validateTimeRange validates the start and end time range against storage files. +// Extracts the start timestamp of the oldest file and the start of the most recent file. +// Since the most recent segment file is still being written to by the segmenter +// we do not want to include it in the time range. +func validateTimeRange(files []string, start, end time.Time) error { + if len(files) == 0 { + return errors.New("no storage files found") + } + oldestFileStart, err := extractDateTimeFromFilename(files[0]) + if err != nil { + return err + } + newestFileStart, err := extractDateTimeFromFilename(files[len(files)-1]) + if err != nil { + return err + } + if start.Before(oldestFileStart) || end.After(newestFileStart) { + return errors.New("time range is outside of storage range") + } + return nil +} + +// validateSaveCommand validates the save command params and checks for valid time format. +func validateSaveCommand(command map[string]interface{}) (time.Time, time.Time, string, error) { + fromStr, ok := command["from"].(string) + if !ok { + return time.Time{}, time.Time{}, "", errors.New("from timestamp not found") + } + from, err := parseDateTimeString(fromStr) + if err != nil { + return time.Time{}, time.Time{}, "", err + } + toStr, ok := command["to"].(string) + if !ok { + return time.Time{}, time.Time{}, "", errors.New("to timestamp not found") + } + to, err := parseDateTimeString(toStr) + if err != nil { + return time.Time{}, time.Time{}, "", err + } + if from.After(to) { + return time.Time{}, time.Time{}, "", errors.New("from timestamp is after to timestamp") + } + metadata, ok := command["metadata"].(string) + if !ok { + metadata = "" } - return "unknown" + return from, to, metadata, nil }