Operator for CronJob resources
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

171 lines
5.6 KiB

  1. package cronop
  2. import (
  3. "context"
  4. "github.com/go-logr/logr"
  5. batchv1beta1 "k8s.io/api/batch/v1beta1"
  6. blindagev1alpha1 "git.blindage.org/21h/cron-operator/pkg/apis/blindage/v1alpha1"
  7. corev1 "k8s.io/api/core/v1"
  8. "k8s.io/apimachinery/pkg/api/errors"
  9. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  10. "k8s.io/apimachinery/pkg/runtime"
  11. "k8s.io/apimachinery/pkg/types"
  12. "sigs.k8s.io/controller-runtime/pkg/client"
  13. "sigs.k8s.io/controller-runtime/pkg/controller"
  14. "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
  15. "sigs.k8s.io/controller-runtime/pkg/handler"
  16. "sigs.k8s.io/controller-runtime/pkg/manager"
  17. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  18. logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
  19. "sigs.k8s.io/controller-runtime/pkg/source"
  20. )
  21. var log = logf.Log.WithName("controller_cronop")
  22. /**
  23. * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
  24. * business logic. Delete these comments after modifying this file.*
  25. */
  26. // Add creates a new CronOp Controller and adds it to the Manager. The Manager will set fields on the Controller
  27. // and Start it when the Manager is Started.
  28. func Add(mgr manager.Manager) error {
  29. return add(mgr, newReconciler(mgr))
  30. }
  31. // newReconciler returns a new reconcile.Reconciler
  32. func newReconciler(mgr manager.Manager) reconcile.Reconciler {
  33. return &ReconcileCronOp{client: mgr.GetClient(), scheme: mgr.GetScheme()}
  34. }
  35. // add adds a new Controller to mgr with r as the reconcile.Reconciler
  36. func add(mgr manager.Manager, r reconcile.Reconciler) error {
  37. // Create a new controller
  38. c, err := controller.New("cronop-controller", mgr, controller.Options{Reconciler: r})
  39. if err != nil {
  40. return err
  41. }
  42. // Watch for changes to primary resource CronOp
  43. err = c.Watch(&source.Kind{Type: &blindagev1alpha1.CronOp{}}, &handler.EnqueueRequestForObject{})
  44. if err != nil {
  45. return err
  46. }
  47. return nil
  48. }
  49. // blank assignment to verify that ReconcileCronOp implements reconcile.Reconciler
  50. var _ reconcile.Reconciler = &ReconcileCronOp{}
  51. // ReconcileCronOp reconciles a CronOp object
  52. type ReconcileCronOp struct {
  53. // This client, initialized using mgr.Client() above, is a split client
  54. // that reads objects from the cache and writes to the apiserver
  55. client client.Client
  56. scheme *runtime.Scheme
  57. }
  58. // Reconcile reads that state of the cluster for a CronOp object and makes changes based on the state read
  59. // and what is in the CronOp.Spec
  60. func (r *ReconcileCronOp) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  61. reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
  62. reqLogger.Info("Reconciling CronOp")
  63. // Fetch the CronOp instance
  64. instance := &blindagev1alpha1.CronOp{}
  65. err := r.client.Get(context.TODO(), request.NamespacedName, instance)
  66. if err != nil {
  67. if errors.IsNotFound(err) {
  68. return reconcile.Result{}, nil
  69. }
  70. // Error reading the object - requeue the request.
  71. return reconcile.Result{}, err
  72. }
  73. // Iterate tasks in resource
  74. for _, task := range instance.Spec.Tasks {
  75. cronjob := generateCronjob(reqLogger, instance, task)
  76. // Set CronOp instance as the owner and controller
  77. if err := controllerutil.SetControllerReference(instance, &cronjob, r.scheme); err != nil {
  78. return reconcile.Result{}, err
  79. }
  80. // Check if this CronJob already exists
  81. found := &batchv1beta1.CronJob{}
  82. err = r.client.Get(context.TODO(), types.NamespacedName{Name: cronjob.Name, Namespace: cronjob.Namespace}, found)
  83. if err != nil && errors.IsNotFound(err) {
  84. reqLogger.Info("Creating a new CronJob", "Namespace", cronjob.Namespace, "Name", cronjob.Name)
  85. err = r.client.Create(context.TODO(), &cronjob)
  86. if err != nil {
  87. return reconcile.Result{}, err
  88. }
  89. // Created successfully - don't requeue
  90. return reconcile.Result{}, nil
  91. } else if err != nil {
  92. return reconcile.Result{}, err
  93. }
  94. }
  95. reqLogger.Info("Reconcile complete", "Namespace", instance.Namespace, "Name", instance.Name)
  96. return reconcile.Result{}, nil
  97. }
  98. // generate one cronjob for specific task
  99. func generateCronjob(reqLogger logr.Logger, cr *blindagev1alpha1.CronOp, jobSpec blindagev1alpha1.CronTask) batchv1beta1.CronJob {
  100. labels := map[string]string{
  101. "operator": "cron-operator",
  102. "cronop": cr.Name,
  103. }
  104. imageName := "busybox"
  105. if cr.Spec.Image != "" {
  106. reqLogger.Info("Found Image override", "Namespace", cr.Namespace, "Name", cr.Name, "Image", cr.Spec.Image)
  107. imageName = cr.Spec.Image
  108. }
  109. cronjob := batchv1beta1.CronJob{
  110. ObjectMeta: metav1.ObjectMeta{
  111. Name: cr.Name + "-" + jobSpec.Name,
  112. Namespace: cr.Namespace,
  113. Labels: labels,
  114. },
  115. Spec: batchv1beta1.CronJobSpec{},
  116. }
  117. cronjob.Spec.Schedule = "0 * * * *"
  118. if jobSpec.Schedule != "" {
  119. reqLogger.Info("Found Schedule override", "Namespace", cr.Namespace, "Name", cr.Name, "Schedule", jobSpec.Schedule)
  120. cronjob.Spec.Schedule = jobSpec.Schedule
  121. }
  122. cronjob.Spec.ConcurrencyPolicy = batchv1beta1.AllowConcurrent
  123. if jobSpec.ConcurrencyPolicy != nil {
  124. reqLogger.Info("Found Concurrency override", "Namespace", cr.Namespace, "Name", cr.Name, "Concurrency", jobSpec.ConcurrencyPolicy)
  125. cronjob.Spec.ConcurrencyPolicy = *jobSpec.ConcurrencyPolicy
  126. }
  127. cronjob.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
  128. if jobSpec.RestartPolicy != nil {
  129. reqLogger.Info("Found RestartPolicy override", "Namespace", cr.Namespace, "Name", cr.Name, "RestartPolicy", jobSpec.RestartPolicy)
  130. cronjob.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy = *jobSpec.RestartPolicy
  131. }
  132. cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers = []corev1.Container{
  133. corev1.Container{
  134. Name: "crontask",
  135. Image: imageName,
  136. Command: jobSpec.Commands,
  137. },
  138. }
  139. return cronjob
  140. }