Browse Source

#5 add context to executeTasks()

#6 Run as other user
tags/version-1.0
Vladimir Smagin 8 months ago
parent
commit
90eb22c874
4 changed files with 105 additions and 45 deletions
  1. 4
    2
      configs/test1.yml
  2. 5
    2
      configs/test2.yml
  3. 51
    1
      file_utils.go
  4. 45
    40
      gogocron.go

+ 4
- 2
configs/test1.yml View File

@@ -1,10 +1,12 @@
---
name: "Print base64 of 20 random symbols"
runsecond: "*/5"
timeout: 1
timeout: 4s
env:
- TESTVAR="test variable"
commands:
- "head -c 20 /dev/urandom |base64"
- whoami
- sleep 3
- head -c 20 /dev/urandom |base64
- ls
- echo $TESTVAR

+ 5
- 2
configs/test2.yml View File

@@ -1,6 +1,9 @@
---
name: "Test task #2"
runsecond: "*/6"
timeout: 1
runsecond: "*/15"
timeout: 1s
user: root
commands:
- "head -c 2 /dev/urandom |base64"
- "sleep 2"
- "echo ooops"

+ 51
- 1
file_utils.go View File

@@ -1,9 +1,16 @@
package main

import (
"context"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"syscall"
"time"

"github.com/kr/pty" // required to run commands as other users
"gopkg.in/yaml.v2"
)

@@ -37,8 +44,51 @@ func readConfigFile(path string) cronTask {

marshErr := yaml.Unmarshal(yamldata, &config)
if marshErr != nil {
log.Fatalf("Error parsing file %v", marshErr)
log.Fatalf("Task file malformed: %v", marshErr)
}

return config
}

// run commands
func runCmd(ctx context.Context, env []string, cmdname string, params ...string) error {
log.Println("runCmd:", cmdname, params)
cmd := exec.Command(cmdname, params...)

// set env variables
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)

log.Printf("runCmd: env %v", env)
// set stdout, stderr
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

//cmd.Start()
f, err := pty.Start(cmd)
if err != nil {
panic(err)
}
io.Copy(os.Stdout, f)
// Use a channel to signal completion so we can use a select statement
done := make(chan error)
go func() { done <- cmd.Wait() }()

// The select statement allows us to execute based on which channel
// we get a message from first.
select {
case <-ctx.Done():
// Timeout happened first, kill the process and print a message.
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(10*time.Second, func() {
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
return ctx.Err()
case err := <-done:
return err
}

return nil
}

+ 45
- 40
gogocron.go View File

@@ -4,7 +4,6 @@ import (
"context"
"log"
"os"
"os/exec"
"os/signal"
"syscall"
"time"
@@ -12,13 +11,14 @@ import (

type cronTask struct {
Name string `yaml:"name"` //name of task
AsUser string `yaml:"user,omitempty"` //run as user
RunSecond string `yaml:"runsecond,omitempty"` // second
RunMinute string `yaml:"runminute,omitempty"` // minute
RunHour string `yaml:"runhour,omitempty"` // hour
RunDom string `yaml:"rundom,omitempty"` // day of month
RunMonth string `yaml:"runmonth,omitempty"` // month
RunDow string `yaml:"rundow,omitempty"` // day of week
TimeOut int `yaml:"timeout,omitempty"` // exec with timeout, seconds
TimeOut string `yaml:"timeout,omitempty"` // exec with timeout, seconds
Env []string `yaml:"env,omitempty"` // array of env variables
Commands []string `yaml:"commands"` // array of commands to exec
}
@@ -63,51 +63,56 @@ func filterTasksToExecute(tasks cronTasks) cronTasks {
return tasksToExecute
}

func runCmd(ctx context.Context, env []string, cmdname string, params ...string) error {
log.Println("runCmd:", cmdname, params)
cmd := exec.Command(cmdname, params...)

// set env variables
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)

log.Printf("runCmd: env %v", env)
// set stdout, stderr
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

cmd.Start()

// Use a channel to signal completion so we can use a select statement
done := make(chan error)
go func() { done <- cmd.Wait() }()

// The select statement allows us to execute based on which channel
// we get a message from first.
select {
case <-ctx.Done():
// Timeout happened first, kill the process and print a message.
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(10*time.Second, func() {
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
return ctx.Err()
case err := <-done:
return err
}

return nil
}

// executeTasks runs batch of planned tasks on go routines with context
func executeTasks(tasks cronTasks) {
now := time.Now()
log.Printf("Execute started: %v", now)

// run every task in own context
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

// iterate tasks and run them in go routine
for _, task := range tasks {
log.Printf("\ttask: %#v", task.Name)
log.Printf("\t\tcommands: %#v", task.Commands)

if task.TimeOut != "" {
timeout, err := time.ParseDuration(task.TimeOut)
if err != nil {
log.Fatalln("Error, task definition malformed!", task.Name)
}
ctx, ctxCancelTimeout := context.WithTimeout(ctx, timeout)
defer ctxCancelTimeout()
go executeTask(ctx, task)
} else {
go executeTask(ctx, task)
}

}

select {}

}

func executeTask(ctx context.Context, task cronTask) {
// flag to stop timed out tasks
timeoutFlag := false

// iterate commands list
for _, cmd := range task.Commands {
if timeoutFlag {
break
}

select {
case <-ctx.Done():
log.Printf("Timeout exceeded for task %v", task.Name)
timeoutFlag = true
default:
// su required to run as other users, requires pty to run
runCmd(ctx, task.Env, "su", "-c", cmd, task.AsUser)
}
}

}

Loading…
Cancel
Save