Skip to content

Commit

Permalink
feature: 分片上传增加并发上传能力 (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
arrebole authored Jun 27, 2023
1 parent 14076d5 commit f4cbdc9
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 501 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
# Optional: golangci-lint command line arguments.
version: v1.47.3
version: v1.52.2
args:
# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
55 changes: 27 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ Table of Contents
* [CopyObjectConfig](#copyobjectconfig)
* [FormUploadConfig](#formuploadconfig)
* [CommitTasksConfig](#committasksconfig)
* [BreakPointConfig](#breakpointconfig)
* [LiveauditCreateTask](#liveauditcreatetask)
* [LiveauditCancelTask](#liveauditcanceltask)
* [SyncCommonTask](#synccommontask)
Expand Down Expand Up @@ -90,10 +89,6 @@ func main() {
LocalPath: "/tmp/upload",
}))

// 断点续传 文件大于 10M 才会分片
resume := &MemoryRecorder{}
// 若设置为 nil,则为正常的分片上传
up.SetRecorder(resume)
fmt.Println(up.Put(&upyun.PutObjectConfig{
Path: "/demo.log",
LocalPath: "/tmp/upload",
Expand Down Expand Up @@ -303,19 +298,39 @@ type PutObjectConfig struct {
Reader io.Reader // 待上传的内容
Headers map[string]string // 额外的 HTTP 请求头
UseMD5 bool // 是否需要 MD5 校验
UseResumeUpload bool // 是否使用断点续传
AppendContent bool // 是否需要追加文件内容
ResumePartSize int64 // 断点续传块大小
MaxResumePutTries int // 断点续传最大重试次数
MultipartUpload bool // 开启自动分片上传,
MultipartUploadWorkers int // 分片上传的线程数,只有开启分片上传时,才有效
MultipartUploadCheckpoint bool // 分片上传时,开启断点续传
OnProgress func(fsize, offset, increase int64) // 分片上传进度变化时 会被调用
}
```

`PutObjectConfig` 提供上传单个文件所需的参数。有几点需要注意:
- `LocalPath``Reader` 是互斥的关系,如果设置了 `LocalPath`,SDK 就会去读取这个文件,而忽略 `Reader` 中的内容。
- 如果 `Reader` 是一个流/缓冲等的话,需要通过 `Headers` 参数设置 `Content-Length`,SDK 默认会对 `*os.File` 增加该字段。
- [断点续传](https://docs.upyun.com/api/rest_api/#_3)的上传内容类型必须是 `*os.File`, 断点续传会将文件按照 `ResumePartSize` 进行切割,然后按次序一块一块上传,如果遇到网络问题,会进行重试,重试 `MaxResumePutTries` 次,默认无限重试。
- `AppendContent` 如果是追加文件的话,确保非最后的分片必须为 1M 的整数倍。
- 如果需要 MD5 校验,SDK 对 `*os.File` 会自动计算 MD5 值,其他类型需要自行通过 `Headers` 参数设置 `Content-MD5`
- [断点续传](https://docs.upyun.com/api/rest_api/#_3)的上传内容类型必须是 `*os.File`, 断点续传会将文件进行切割,然后按次序一块一块上传,如果遇到网络问题,会进行重试。
- 如果需要 MD5 校验,SDK 对 `*os.File` 会自动计算 MD5 值,其他类型需要自行通过 `Headers` 参数设置 `Content-MD5`, 开启多线程上传时,无法校验 MD5。


获取上传进度
```GO
config := &PutObjectConfig{
Path: path,
LocalPath: fname,
UseMD5: false,
Headers: make(map[string]string),
MultipartUpload: true,
MultipartUploadWorkers: 2,
MultipartUploadCheckpoint: true,
}

var count int64
config.OnProgress = func(fsize, offset, increase int64) {
atomic.AddInt64(&count, increase)
fmt.Printf("当前进度 %d%\n", count*100/(fsize-offset))
}

```


#### GetObjectConfig
Expand Down Expand Up @@ -439,22 +454,6 @@ type CommitTasksConfig struct {

`CommitTasksConfig` 提供提交异步任务所需的参数。`Accept``Source` 仅与异步音视频处理有关。`Tasks` 是一个任务数组,数组中的每一个元素都是任务相关的参数(一般情况下为字典类型)。


#### BreakPointConfig

```go
type BreakPointConfig struct {
UploadID string
PartID int // 失败待上传的 PartID
PartSize int64
FileSize int64
FileModTime time.Time
LastTime time.Time
}
```

`BreakPointConfig` 提供续传所需的参数,在使用 `Put` 进行断点续传时,首先使用 `UpYun``SetRecorder` 传入一个 `Recorder` 接口类型(内部使用 `MemoryRecorder` 实现了该接口),当出现上传失败的时候,调用 `UpYun` 下的 `Recorder``Get` 方法,即可获取当前断点的相关信息。

#### LiveauditCreateTask

```go
Expand Down
12 changes: 12 additions & 0 deletions upyun/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,15 @@ func errorOperation(op string, err error) error {
}
return fmt.Errorf("%s: %w", op, err)
}

// 分片重复上传错误
func IsDuplicatePart(err error) bool {
if err != nil {
return false
}

if e, ok := err.(*Error); ok && e.Code == 40011061 {
return true
}
return false
}
12 changes: 6 additions & 6 deletions upyun/fileinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ type FileInfo struct {
}

/*
Content-Type: image/gif
ETag: "dc9ea7257aa6da18e74505259b04a946"
x-upyun-file-type: GIF
x-upyun-height: 379
x-upyun-width: 500
x-upyun-frames: 90
Content-Type: image/gif
ETag: "dc9ea7257aa6da18e74505259b04a946"
x-upyun-file-type: GIF
x-upyun-height: 379
x-upyun-width: 500
x-upyun-frames: 90
*/
func parseHeaderToFileInfo(header http.Header, getinfo bool) *FileInfo {
fInfo := &FileInfo{}
Expand Down
137 changes: 137 additions & 0 deletions upyun/part.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package upyun

import (
"context"
"errors"
"io"
"sync"
)

type chunk struct {
Id int
data []byte
}

type multipartUploader struct {
reader io.ReadSeeker
config *PutObjectConfig
skiper *skiper
partSize int64

// 多线程上传任务队列
queue chan *chunk

// 记录上传时的错误
errout chan error

wg sync.WaitGroup
}

func createMultipartUploader(
reader io.ReadSeeker,
config *PutObjectConfig,
skiper *skiper,
partSize int64,
) *multipartUploader {
return &multipartUploader{
reader: reader,
config: config,
skiper: skiper,
partSize: partSize,
}
}

func errorJoin(c chan error) error {
if len(c) == 0 {
return nil
}
s := make([]error, 0)
for i := range c {
s = append(s, i)
}
return errors.Join(s...)
}

func (p *multipartUploader) product(ctx context.Context) {
defer close(p.queue)

// 跳过已经上传的文件前一部分
var partId int64
if p.skiper != nil {
partId = p.skiper.FirstMissPartId()
if _, err := p.reader.Seek(partId*p.partSize, io.SeekCurrent); err != nil {
p.errout <- err
return
}
}

for {
select {
case <-ctx.Done():
return
default:
buffer := make([]byte, p.partSize)
n, err := p.reader.Read(buffer)
if err != nil {
if err != io.EOF {
p.errout <- err
}
return
}

// 如果分片已经存在,则跳过
if p.skiper != nil && p.skiper.IsSkip(partId) {
partId++
continue
}

p.queue <- &chunk{
Id: int(partId),
data: buffer[:n],
}
partId++
}
}
}

func (p *multipartUploader) work(ctx context.Context, cancel context.CancelFunc, fn func(id int, data []byte) error) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case ch, ok := <-p.queue:
if !ok {
return
}
if err := fn(ch.Id, ch.data); err != nil {
p.errout <- err
cancel()
return
}
}
}
}

func (p *multipartUploader) Go(fn func(id int, data []byte) error) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 由于需要包含product的错误所以需要+1
p.errout = make(chan error, p.config.MultipartUploadWorkers+1)
p.queue = make(chan *chunk, p.config.MultipartUploadWorkers)

// 生成任务
go p.product(ctx)

// 消费任务
for i := 0; i < p.config.MultipartUploadWorkers; i++ {
p.wg.Add(1)
go p.work(ctx, cancel, fn)
}

p.wg.Wait()
close(p.errout)

return errorJoin(p.errout)
}
56 changes: 56 additions & 0 deletions upyun/part_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package upyun

import (
"bytes"
"testing"
)

func TestMultipartUploader(t *testing.T) {
uploader := createMultipartUploader(
bytes.NewReader([]byte("hello world")),
&PutObjectConfig{
MultipartUploadWorkers: 4,
},
nil,
1,
)

var result = make([]byte, 11)
uploader.Go(func(id int, data []byte) error {
Equal(t, 1, len(data))
result[id] = data[0]
return nil
})
Equal(t, string(result), "hello world")
}

func TestMultipartUploaderSeek(t *testing.T) {
payload := []byte("hello world")
uploader := createMultipartUploader(
bytes.NewReader(payload),
&PutObjectConfig{
MultipartUploadWorkers: 4,
},
createSkiper(
2,
[]*DisorderPart{
&DisorderPart{ID: 0, Size: 1},
&DisorderPart{ID: 1, Size: 1},
&DisorderPart{ID: 3, Size: 1},
&DisorderPart{ID: 5, Size: 1},
&DisorderPart{ID: 9, Size: 1},
&DisorderPart{ID: 10, Size: 1},
},
),
1,
)

var result = []byte{'h', 'e', 0, 'l', 0, ' ', 0, 0, 0, 'l', 'd'}
uploader.Go(func(id int, data []byte) error {
Equal(t, 1, len(data))
result[id] = data[0]
return nil
})

Equal(t, string(result), "hello world")
}
1 change: 0 additions & 1 deletion upyun/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (up *UpYun) CommitTasks(config *CommitTasksConfig) (taskIds []string, err e
if config.Accept != "" {
kwargs["accept"] = config.Accept
}

err = up.doProcessRequest("POST", "/pretreatment/", kwargs, &taskIds)
return
}
Expand Down
5 changes: 2 additions & 3 deletions upyun/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func TestSpider(t *testing.T) {
NotifyUrl: NOTIFY_URL,
Tasks: []interface{}{task},
})

Nil(t, err)
Equal(t, len(ids), 1)
}
Expand Down Expand Up @@ -69,7 +68,7 @@ func TestNagaResult(t *testing.T) {
Equal(t, len(res), 2)
}

//由于是异步操作,不能确保文件已存在
// 由于是异步操作,不能确保文件已存在
func TestImgaudit(t *testing.T) {
task := map[string]interface{}{
"url": JPG_URL,
Expand Down Expand Up @@ -99,7 +98,7 @@ func TestImgaudit(t *testing.T) {

}

//由于是异步操作,不能确保文件已存在
// 由于是异步操作,不能确保文件已存在
func TestVideoaudit(t *testing.T) {
task := map[string]interface{}{
"url": MP4_URL,
Expand Down
Loading

0 comments on commit f4cbdc9

Please sign in to comment.