woj-server/internal/service/runner/container.go

143 lines
3.5 KiB
Go

package runner
import (
"context"
"fmt"
"git.0x7f.app/WOJ/woj-server/pkg/file"
"git.0x7f.app/WOJ/woj-server/pkg/utils"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/oci"
"github.com/opencontainers/runtime-spec/specs-go"
"go.uber.org/zap"
"io"
"os"
"syscall"
"time"
)
type ProgramArgs struct {
Args []string
Env []string
}
type RuntimeArgs struct {
Image string
Pid int64
Memory uint64
Timeout time.Duration
Mount []specs.Mount
}
func (r *RuntimeArgs) Normalize() {
r.Pid = utils.If(r.Pid <= 0, 64, r.Pid)
r.Memory = utils.If(r.Memory <= 0, 128*1024*1024, r.Memory)
r.Timeout = utils.If(r.Timeout <= 0, time.Minute, r.Timeout)
}
type IOArgs struct {
Output *os.File
// Limit is the max size of output in chars.
// if Limit = 0, output to stderr if verbose, discard output if not.
// if Limit < 0, discard output.
Limit int64
}
type RunArgs struct {
Program ProgramArgs
Runtime RuntimeArgs
IO IOArgs
}
func (s *service) ContainerRun(arg *RunArgs) error {
identifier := fmt.Sprintf("%d", s.container.count.Add(1))
// prepare args
arg.Runtime.Normalize()
// prepare output
var writer io.Writer = nil
if arg.IO.Limit == 0 && s.verbose {
writer = os.Stderr
} else if arg.IO.Limit > 0 && arg.IO.Output != nil {
writer = &file.LimitedWriter{
File: arg.IO.Output,
Limit: arg.IO.Limit,
}
}
// debug log
s.log.Debug("container started", zap.String("identifier", identifier), zap.Any("args", arg))
defer func(identifier string) {
s.log.Debug("container finished", zap.String("identifier", identifier))
}(identifier)
// get image
image, err := s.container.client.GetImage(s.container.ctx, arg.Runtime.Image)
// TODO: we could cache the image struct
if err != nil {
return err
}
// create container
container, err := s.container.client.NewContainer(s.container.ctx, "task-"+identifier,
containerd.WithNewSnapshot("snapshot-"+identifier, image),
containerd.WithNewSpec(
oci.WithImageConfig(image),
oci.WithMemoryLimit(arg.Runtime.Memory),
oci.WithPidsLimit(arg.Runtime.Pid),
oci.WithMounts(arg.Runtime.Mount),
oci.WithProcessArgs(arg.Program.Args...),
oci.WithEnv(arg.Program.Env),
),
)
if err != nil {
return err
}
defer func(container containerd.Container, ctx context.Context, opts ...containerd.DeleteOpts) {
_ = container.Delete(ctx, opts...)
}(container, s.container.ctx, containerd.WithSnapshotCleanup)
// create task
task, err := container.NewTask(s.container.ctx, cio.NewCreator(cio.WithStreams(nil, writer, writer)))
if err != nil {
return err
}
defer func(task containerd.Task, ctx context.Context, opts ...containerd.ProcessDeleteOpts) {
_, _ = task.Delete(ctx, opts...)
}(task, s.container.ctx, containerd.WithProcessKill)
// wait
ctx2, cancel := context.WithTimeout(s.container.ctx, arg.Runtime.Timeout)
defer cancel()
exitStatusC, err := task.Wait(ctx2)
if err != nil {
return err
}
// start
err = task.Start(s.container.ctx)
if err != nil {
return err
}
// kill on timeout
status := <-exitStatusC
code, _, _ := status.Result()
if code == containerd.UnknownExitStatus {
// containerd is C/S architecture, timeout means grpc timeout, resulting in unknown exit status
// manually kill the task
s.log.Debug("container timeout", zap.String("identifier", identifier))
err := task.Kill(s.container.ctx, syscall.SIGKILL)
if err != nil {
return err
}
}
return nil
}
func (s *service) ContainerRunPool(arg *RunArgs) int {
return s.pool.AddTask(func() error { return s.ContainerRun(arg) })
}