I have been using qiniu's storage service to generate thumbnails, blurred images, and webp of videos. Now I need to move the storage to S3, so I have to write these pictures by myself. This article sorts out the general idea.
Analyze the requirements
First, let’s take a look at how qiniu’s interface processes pictures, for example, first intercept the picture in the first second of the video, then thumb up the picture, and finally store it in a new key. The command can be written like thisvframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/xxx
, You can see that the three operations are divided by | symbols, similar to the pipe operation of Unix.
The above operation is calculated as a cmd. A single API request can process multiple cmds at the same time. The cmd is divided by a semicolon. After processing, the processing result is returned in the callback, for example
{
"id": "xxxxx",
"pipeline": "xxx",
"code": 0,
"desc": "The fop was completed successfully",
"reqid": "xTsAAFnxUbR5J10U",
"inputBucket": "xxx",
"inputKey": "xxxxx",
"items": [
{
"cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzA=",
"code": 0,
"desc": "The fop was completed successfully",
"hash": "FhdN6V8EI4vW4XJGALSfxutvMEIv",
"key": "xx",
"returnOld": 0
},
{
"cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|imageMogr2/blur/45x8|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzBfYmx1cg==",
"code": 0,
"desc": "The fop was completed successfully",
"hash": "FgNiRzrCsa7TZx1xVSb_4d5TiaK3",
"key": "xxx",
"returnOld": 0
}
]
}
Decomposition of requirements
This program requires roughly several parts:
An http interface accepts the task, and after accepting it, throws the task to the queue, returning a job ID. The worker handles tasks asynchronously. The number of workers and the number of parallel processing of each worker can be configured. The worker has a retry mechanism.
Parse the tasks that need to be done from the job payload and parse each cmd. It is best to execute each cmd in parallel and record the results of each cmd.
There are multiple operations in each cmd and are connected with pipe. The output of the previous operation is the input of the next operation.
You can look at 1, 2, 3 separately. 1 is relatively independent. I wrote a worker model before, referring to this article Handling 1 Million Requests per Minute with Go. It is more detailed. It uses go channel as the queue. I added a beanstalk as the queue provider. Another improvement is that the article only provides the settings for the number of workers. I added another parameter to set the number of coroutines that each worker can execute in parallel. So below mainly talk about the solutions of 3 and 2
Pipe
You can refer to this library pipe, and the usage is as follows:
p := (
(""),
resize(300, 300),
blur(0.5),
)
output, err := (p)
if err != nil {
("%v\n", err)
}
buf := (output)
img, _ := (buf)
(img, "test_a.png")
It is quite convenient. Create a Cmd struct, use regular matching parameters of each Operation, put in a []Op slice, and finally execute. The struct and method are as follows:
type Cmd struct {
cmd string
saveas string
ops []Op
err error
}
type Op interface {
getPipe()
}
type ResizeOp struct {
width, height int
}
func (c ResizeOp) getPipe() {
return resize(, )
}
//How to use
cmdStr := `file/|thumbnail/x300|blur/20x8`
cmd := Cmd{cmdStr, "test_b.png", nil, nil}
()
()
After a single cmd processing is solved, it is a parallel problem of multiple cmds. There is nothing to think about, and it can be solved perfectly by using it directly. Step by step, let’s take a look at how to use this struct:
func main() {
cmds := []string{}
for i := 0; i < 10000; i++ {
cmds = append(cmds, ("cmd-%d", i))
}
results := handleCmds(cmds)
(len(results)) // 10000
}
func doCmd(cmd string) string {
return ("cmd=%s", cmd)
}
func handleCmds(cmds []string) (results []string) {
(len(cmds)) //10000
var count uint64
group := {}
lock := {}
for _, item := range cmds {
// Add one count
(1)
go func(cmd string) {
result := doCmd(cmd)
atomic.AddUint64(&count, 1)
()
results = append(results, result)
()
// Count down one
()
}(item)
}
// Blocking
()
("count=%d \n", count) // 10000
return
}
The essence of a group is probably a counter. When counting > 0, () will block until counting == 0. There is another thing to note here, that is, the operation of results = append(results, result) is thread-insecure. It is clear that results are shared here and need to be locked to ensure synchronization, otherwise the final len(results) will not be 10000.
We build a BenchCmd to store cmds. As follows:
type BenchCmd struct {
cmds []Cmd
waitGroup
errs []error
lock
}
func (b *BenchCmd) doCmds() {
for _, item := range {
(1)
go func(cmd Cmd) {
()
err := ()
()
= append(, err)
()
()
}(item)
}
()
}
The final call looks like this:
var cmds []Cmd
cmd_a := Cmd{`file/|thumbnail/x300|blur/20x8`, "test_a.png", nil, nil}
cmd_b := Cmd{`file/|thumbnail/500x1000|blur/20x108`, "test_b.png", nil, nil}
cmd_c := Cmd{`file/|thumbnail/300x300`, "test_c.png", nil, nil}
cmds = append(cmds, cmd_a)
cmds = append(cmds, cmd_b)
cmds = append(cmds, cmd_c)
bench := BenchCmd{
cmds: cmds,
waitGroup: {},
lock: {},
}
()
()
This is just a primary experiment. The thinking is not comprehensive enough and it is just imitating the API. Qiniu should not do this, and the coupling is lower. Perhaps each Cmd has its own cluster to process, so the pipe library cannot be solved for the time being. The current limitation is that each Cmd must be in a process.