Kubernetes RabbitMQ operator https://github.com/tekliner/rabbitmq-operator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

545 lines
16 KiB

  1. package rabbitmq
  2. import (
  3. "context"
  4. "os"
  5. "reflect"
  6. "strconv"
  7. "time"
  8. "github.com/getsentry/raven-go"
  9. rabbitmqv1 "github.com/tekliner/rabbitmq-operator/pkg/apis/rabbitmq/v1"
  10. v1 "k8s.io/api/apps/v1"
  11. corev1 "k8s.io/api/core/v1"
  12. "k8s.io/apimachinery/pkg/api/errors"
  13. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  14. "k8s.io/apimachinery/pkg/runtime"
  15. "k8s.io/apimachinery/pkg/types"
  16. "sigs.k8s.io/controller-runtime/pkg/client"
  17. "sigs.k8s.io/controller-runtime/pkg/controller"
  18. "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
  19. "sigs.k8s.io/controller-runtime/pkg/event"
  20. "sigs.k8s.io/controller-runtime/pkg/handler"
  21. "sigs.k8s.io/controller-runtime/pkg/manager"
  22. "sigs.k8s.io/controller-runtime/pkg/predicate"
  23. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  24. logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
  25. "sigs.k8s.io/controller-runtime/pkg/source"
  26. )
  27. func init() {
  28. sentryDSN := os.Getenv("SENTRY_DSN")
  29. if sentryDSN != "" {
  30. raven.SetDSN(sentryDSN)
  31. }
  32. }
  33. var log = logf.Log.WithName("controller_rabbitmq")
  34. /**
  35. * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
  36. * business logic. Delete these comments after modifying this file.*
  37. */
  38. // Add creates a new Rabbitmq Controller and adds it to the Manager. The Manager will set fields on the Controller
  39. // and Start it when the Manager is Started.
  40. func Add(mgr manager.Manager) error {
  41. return add(mgr, newReconciler(mgr))
  42. }
  43. // newReconciler returns a new reconcile.Reconciler
  44. func newReconciler(mgr manager.Manager) reconcile.Reconciler {
  45. return &ReconcileRabbitmq{client: mgr.GetClient(), scheme: mgr.GetScheme()}
  46. }
  47. // add adds a new Controller to mgr with r as the reconcile.Reconciler
  48. func add(mgr manager.Manager, reconciler reconcile.Reconciler) error {
  49. // Create a new controller
  50. c, err := controller.New("rabbitmq-controller", mgr, controller.Options{Reconciler: reconciler})
  51. if err != nil {
  52. raven.CaptureErrorAndWait(err, nil)
  53. return err
  54. }
  55. // Watch for changes to primary resource Rabbitmq
  56. err = c.Watch(&source.Kind{Type: &rabbitmqv1.Rabbitmq{}}, &handler.EnqueueRequestForObject{})
  57. if err != nil {
  58. raven.CaptureErrorAndWait(err, nil)
  59. return err
  60. }
  61. // TODO(user): Modify this to be the types you create that are owned by the primary resource
  62. // Watch for changes to secondary resource Pods and requeue the owner Rabbitmq
  63. err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
  64. IsController: true,
  65. OwnerType: &rabbitmqv1.Rabbitmq{},
  66. })
  67. if err != nil {
  68. raven.CaptureErrorAndWait(err, nil)
  69. return err
  70. }
  71. // Watch for changes to secondary resource StatefulSets and requeue the owner
  72. err = c.Watch(&source.Kind{Type: &v1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
  73. OwnerType: &rabbitmqv1.Rabbitmq{},
  74. })
  75. if err != nil {
  76. raven.CaptureErrorAndWait(err, nil)
  77. return err
  78. }
  79. mapFn := handler.ToRequestsFunc(
  80. func(a handler.MapObject) []reconcile.Request {
  81. return []reconcile.Request{
  82. {NamespacedName: types.NamespacedName{Name: a.Meta.GetLabels()["rabbitmq.improvado.io/instance"], Namespace: a.Meta.GetNamespace()}},
  83. }
  84. })
  85. p := predicate.Funcs{
  86. UpdateFunc: func(e event.UpdateEvent) bool {
  87. if _, ok := e.MetaOld.GetLabels()["rabbitmq.improvado.io/instance"]; !ok {
  88. return false
  89. }
  90. return e.ObjectOld != e.ObjectNew
  91. },
  92. CreateFunc: func(e event.CreateEvent) bool {
  93. if _, ok := e.Meta.GetLabels()["rabbitmq.improvado.io/instance"]; !ok {
  94. return false
  95. }
  96. return true
  97. },
  98. }
  99. err = c.Watch(
  100. &source.Kind{Type: &corev1.Secret{}},
  101. &handler.EnqueueRequestsFromMapFunc{
  102. ToRequests: mapFn,
  103. }, p)
  104. if err != nil {
  105. raven.CaptureErrorAndWait(err, nil)
  106. return err
  107. }
  108. return nil
  109. }
  110. var _ reconcile.Reconciler = &ReconcileRabbitmq{}
  111. // ReconcileRabbitmq reconciles a Rabbitmq object
  112. type ReconcileRabbitmq struct {
  113. // This client, initialized using mgr.Client() above, is a split client
  114. // that reads objects from the cache and writes to the apiserver
  115. client client.Client
  116. scheme *runtime.Scheme
  117. }
  118. // Reconcile reads that state of the cluster for a Rabbitmq object and makes changes based on the state read
  119. // and what is in the Rabbitmq.Spec
  120. // TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
  121. // a Pod as an example
  122. // Note:
  123. // The Controller will requeue the Request to be processed again if the returned error is non-nil or
  124. // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
  125. func mergeMaps(itermaps ...map[string]string) map[string]string {
  126. result := make(map[string]string)
  127. for _, rv := range itermaps {
  128. for k, v := range rv {
  129. result[k] = v
  130. }
  131. }
  132. return result
  133. }
  134. func returnLabels(cr *rabbitmqv1.Rabbitmq) map[string]string {
  135. labels := map[string]string{
  136. "rabbitmq.improvado.io/instance": cr.Name,
  137. }
  138. return labels
  139. }
  140. func returnAnnotationsPrometheus(cr *rabbitmqv1.Rabbitmq) map[string]string {
  141. return map[string]string{
  142. "prometheus.io/scrape": "true",
  143. "prometheus.io/port": strconv.Itoa(int(cr.Spec.RabbitmqPrometheusExporterPort)),
  144. }
  145. }
  146. func returnAnnotations(cr *rabbitmqv1.Rabbitmq) map[string]string {
  147. annotations := map[string]string{}
  148. if cr.Spec.RabbitmqPrometheusExporterPort > 0 {
  149. annotations = mergeMaps(annotations, returnAnnotationsPrometheus(cr))
  150. }
  151. return annotations
  152. }
  153. // Reconcile method
  154. func (r *ReconcileRabbitmq) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  155. reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
  156. reqLogger.Info("Reconciling Rabbitmq v1")
  157. // Fetch the Rabbitmq instance
  158. instance := &rabbitmqv1.Rabbitmq{}
  159. err := r.client.Get(context.TODO(), request.NamespacedName, instance)
  160. if err != nil {
  161. if errors.IsNotFound(err) {
  162. // Request object not statefulsetFound, could have been deleted after reconcile request.
  163. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
  164. // Return and don't requeue
  165. return reconcile.Result{}, nil
  166. }
  167. // Error reading the object - requeue the request.
  168. raven.CaptureErrorAndWait(err, nil)
  169. return reconcile.Result{}, err
  170. }
  171. // secrets used for API requests and user control
  172. reqLogger.Info("Reconciling secrets")
  173. secretNames, err := r.reconcileSecrets(reqLogger, instance)
  174. if err != nil {
  175. raven.CaptureErrorAndWait(err, nil)
  176. return reconcile.Result{}, err
  177. }
  178. statefulset := newStatefulSet(instance, secretNames)
  179. if err := controllerutil.SetControllerReference(instance, statefulset, r.scheme); err != nil {
  180. raven.CaptureErrorAndWait(err, nil)
  181. return reconcile.Result{}, err
  182. }
  183. statefulsetFound := &v1.StatefulSet{}
  184. err = r.client.Get(context.TODO(), types.NamespacedName{Name: statefulset.Name, Namespace: statefulset.Namespace}, statefulsetFound)
  185. if err != nil && errors.IsNotFound(err) {
  186. reqLogger.Info("Creating a new statefulset", "statefulset.Namespace", statefulset.Namespace, "statefulset.Name", statefulset.Name)
  187. err = r.client.Create(context.TODO(), statefulset)
  188. if err != nil {
  189. raven.CaptureErrorAndWait(err, nil)
  190. return reconcile.Result{}, err
  191. }
  192. // statefulset created successfully - don't requeue
  193. return reconcile.Result{}, nil
  194. } else if err != nil {
  195. raven.CaptureErrorAndWait(err, nil)
  196. return reconcile.Result{}, err
  197. }
  198. if !reflect.DeepEqual(statefulsetFound.Spec, statefulset.Spec) {
  199. statefulsetFound.Spec.Replicas = statefulset.Spec.Replicas
  200. statefulsetFound.Spec.Template = statefulset.Spec.Template
  201. statefulsetFound.Spec.Selector = statefulset.Spec.Selector
  202. }
  203. if !reflect.DeepEqual(statefulsetFound.Annotations, statefulset.Annotations) {
  204. statefulsetFound.Annotations = statefulset.Annotations
  205. }
  206. if !reflect.DeepEqual(statefulsetFound.Labels, statefulset.Labels) {
  207. statefulsetFound.Labels = statefulset.Labels
  208. }
  209. reqLogger.Info("Reconcile statefulset", "statefulset.Namespace", statefulsetFound.Namespace, "statefulset.Name", statefulsetFound.Name)
  210. if err = r.client.Update(context.TODO(), statefulsetFound); err != nil {
  211. reqLogger.Info("Reconcile statefulset error", "statefulset.Namespace", statefulsetFound.Namespace, "statefulset.Name", statefulsetFound.Name)
  212. raven.CaptureErrorAndWait(err, nil)
  213. return reconcile.Result{}, err
  214. }
  215. // creating services
  216. reqLogger.Info("Reconciling services")
  217. _, err = r.reconcileHTTPService(reqLogger, instance)
  218. if err != nil {
  219. raven.CaptureErrorAndWait(err, nil)
  220. return reconcile.Result{}, err
  221. }
  222. // all-in-one service
  223. _, err = r.reconcileHAService(reqLogger, instance)
  224. if err != nil {
  225. raven.CaptureErrorAndWait(err, nil)
  226. return reconcile.Result{}, err
  227. }
  228. _, err = r.reconcileDiscoveryService(reqLogger, instance)
  229. if err != nil {
  230. raven.CaptureErrorAndWait(err, nil)
  231. return reconcile.Result{}, err
  232. }
  233. // configmap
  234. reqLogger.Info("Reconciling configmap")
  235. _, err = r.reconcileConfigMap(reqLogger, instance, secretNames)
  236. if err != nil {
  237. raven.CaptureErrorAndWait(err, nil)
  238. return reconcile.Result{}, err
  239. }
  240. // check prometheus exporter flag
  241. if instance.Spec.RabbitmqPrometheusExporterPort > 0 {
  242. _, err = r.reconcilePrometheusExporterService(reqLogger, instance)
  243. if err != nil {
  244. raven.CaptureErrorAndWait(err, nil)
  245. return reconcile.Result{}, err
  246. }
  247. }
  248. // use ServiceMonitor?
  249. if instance.Spec.RabbitmqUseServiceMonitor {
  250. _, err = r.reconcilePrometheusExporterServiceMonitor(reqLogger, instance)
  251. if err != nil {
  252. raven.CaptureErrorAndWait(err, nil)
  253. return reconcile.Result{}, err
  254. }
  255. }
  256. // set policies
  257. reqLogger.Info("Setting up policies")
  258. timeoutPolicies, _ := time.ParseDuration("30")
  259. timeoutFlagPolicies := false
  260. ctxPolicies, ctxPoliciesCancelTimeout := context.WithTimeout(context.Background(), timeoutPolicies)
  261. defer ctxPoliciesCancelTimeout()
  262. go r.setPolicies(ctxPolicies, reqLogger, instance, secretNames)
  263. for {
  264. if timeoutFlagPolicies {
  265. break
  266. }
  267. select {
  268. case <-ctxPolicies.Done():
  269. timeoutFlagPolicies = true
  270. }
  271. }
  272. // set additional users
  273. reqLogger.Info("Setting up additional users")
  274. timeoutUsers, _ := time.ParseDuration("30")
  275. timeoutFlagUsers := false
  276. ctxUsers, ctxUsersCancelTimeout := context.WithTimeout(context.Background(), timeoutUsers)
  277. defer ctxUsersCancelTimeout()
  278. go r.syncUsersCredentials(ctxUsers, reqLogger, instance, secretNames)
  279. for {
  280. if timeoutFlagUsers {
  281. break
  282. }
  283. select {
  284. case <-ctxUsers.Done():
  285. timeoutFlagUsers = true
  286. }
  287. }
  288. // reconcile PodDisruptionBudget
  289. reqLogger.Info("Reconciling PodDisruptionBudget")
  290. _, err = r.reconcilePdb(reqLogger, instance)
  291. if err != nil {
  292. raven.CaptureErrorAndWait(err, nil)
  293. return reconcile.Result{}, err
  294. }
  295. _, err = r.reconcileFinalizers(reqLogger, instance)
  296. if err != nil {
  297. raven.CaptureErrorAndWait(err, nil)
  298. return reconcile.Result{}, err
  299. }
  300. return reconcile.Result{}, nil
  301. }
  302. func appendNodeVariables(env []corev1.EnvVar, cr *rabbitmqv1.Rabbitmq) []corev1.EnvVar {
  303. return append(env,
  304. corev1.EnvVar{
  305. Name: "RABBITMQ_USE_LONGNAME",
  306. Value: "true",
  307. },
  308. corev1.EnvVar{
  309. Name: "K8S_HOSTNAME_SUFFIX",
  310. Value: "." + cr.Name + "-discovery." + cr.Namespace + "." + cr.Spec.RabbitmqK8SServiceDiscovery,
  311. },
  312. corev1.EnvVar{
  313. Name: "K8S_SERVICE_NAME",
  314. Value: cr.Name + "-discovery",
  315. },
  316. corev1.EnvVar{
  317. Name: "MY_POD_NAME",
  318. ValueFrom: &corev1.EnvVarSource{
  319. FieldRef: &corev1.ObjectFieldSelector{
  320. FieldPath: "metadata.name",
  321. },
  322. },
  323. },
  324. corev1.EnvVar{
  325. Name: "RABBITMQ_NODENAME",
  326. Value: "rabbit@$(MY_POD_NAME)." + cr.Name + "-discovery." + cr.Namespace + "." + cr.Spec.RabbitmqK8SServiceDiscovery,
  327. },
  328. )
  329. }
  330. func newStatefulSet(cr *rabbitmqv1.Rabbitmq, secretNames secretResouces) *v1.StatefulSet {
  331. // prepare containers for pod
  332. podContainers := []corev1.Container{}
  333. // check affinity rules
  334. affinity := &corev1.Affinity{}
  335. if cr.Spec.RabbitmqAffinity != nil {
  336. affinity = cr.Spec.RabbitmqAffinity
  337. }
  338. // container with rabbitmq
  339. rabbitmqContainer := corev1.Container{
  340. Name: "rabbitmq",
  341. Image: cr.Spec.K8SImage.Name + ":" + cr.Spec.K8SImage.Tag,
  342. Env: append(appendNodeVariables(cr.Spec.K8SENV, cr), corev1.EnvVar{
  343. Name: "RABBITMQ_ERLANG_COOKIE",
  344. ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: cr.Name}, Key: ".erlang.cookie"}},
  345. }),
  346. Resources: corev1.ResourceRequirements{
  347. Requests: cr.Spec.RabbitmqPodRequests,
  348. Limits: cr.Spec.RabbitmqPodLimits,
  349. },
  350. VolumeMounts: []corev1.VolumeMount{
  351. {
  352. Name: "rabbit-etc",
  353. MountPath: "/etc/rabbitmq",
  354. },
  355. {
  356. Name: "rabbit-data",
  357. MountPath: "/var/lib/rabbitmq",
  358. },
  359. {
  360. Name: "rabbit-config",
  361. MountPath: "/rabbit-config",
  362. },
  363. },
  364. }
  365. podContainers = append(podContainers, rabbitmqContainer)
  366. // if prometheus exporter enabled add additional container to pod
  367. if cr.Spec.RabbitmqPrometheusExporterPort > 0 {
  368. exporterImageAndTag := "kbudde/rabbitmq-exporter:latest"
  369. if cr.Spec.RabbitmqPrometheusImage != "" {
  370. exporterImageAndTag = cr.Spec.RabbitmqPrometheusImage
  371. }
  372. exporterContainer := corev1.Container{
  373. Name: "prometheus-exporter",
  374. Image: exporterImageAndTag,
  375. ImagePullPolicy: corev1.PullIfNotPresent,
  376. Env: []corev1.EnvVar{
  377. {
  378. Name: "RABBIT_USER",
  379. ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretNames.ServiceAccount}, Key: "username"}},
  380. },
  381. {
  382. Name: "RABBIT_PASSWORD",
  383. ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretNames.ServiceAccount}, Key: "password"}},
  384. },
  385. },
  386. Ports: []corev1.ContainerPort{
  387. {
  388. Name: "exporter",
  389. Protocol: corev1.ProtocolTCP,
  390. ContainerPort: cr.Spec.RabbitmqPrometheusExporterPort,
  391. },
  392. },
  393. }
  394. podContainers = append(podContainers, exporterContainer)
  395. }
  396. podTemplate := corev1.PodTemplateSpec{
  397. ObjectMeta: metav1.ObjectMeta{
  398. Labels: returnLabels(cr),
  399. Annotations: returnAnnotations(cr),
  400. },
  401. Spec: corev1.PodSpec{
  402. Affinity: affinity,
  403. ServiceAccountName: cr.Spec.RabbitmqK8SServiceAccount,
  404. InitContainers: []corev1.Container{
  405. {
  406. Name: "copy-rabbitmq-config",
  407. Image: "busybox",
  408. Command: []string{"sh", "/rabbit-config/init.sh"},
  409. VolumeMounts: []corev1.VolumeMount{
  410. {
  411. Name: "rabbit-etc",
  412. MountPath: "/etc/rabbitmq",
  413. },
  414. {
  415. Name: "rabbit-config",
  416. MountPath: "/rabbit-config",
  417. },
  418. {
  419. Name: "rabbit-data",
  420. MountPath: "/var/lib/rabbitmq",
  421. },
  422. },
  423. },
  424. },
  425. Containers: podContainers,
  426. Tolerations: cr.Spec.Tolerations,
  427. NodeSelector: cr.Spec.NodeSelector,
  428. Volumes: []corev1.Volume{
  429. {
  430. Name: "rabbit-config",
  431. VolumeSource: corev1.VolumeSource{
  432. ConfigMap: &corev1.ConfigMapVolumeSource{
  433. LocalObjectReference: corev1.LocalObjectReference{
  434. Name: cr.Name,
  435. },
  436. },
  437. },
  438. },
  439. {
  440. Name: "rabbit-etc",
  441. VolumeSource: corev1.VolumeSource{
  442. EmptyDir: &corev1.EmptyDirVolumeSource{},
  443. },
  444. },
  445. },
  446. },
  447. }
  448. PVCTemplate := corev1.PersistentVolumeClaim{
  449. ObjectMeta: metav1.ObjectMeta{
  450. Name: "rabbit-data",
  451. Finalizers: cr.ObjectMeta.Finalizers,
  452. },
  453. Spec: corev1.PersistentVolumeClaimSpec{
  454. AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
  455. StorageClassName: cr.Spec.RabbitmqStorageClass,
  456. Resources: corev1.ResourceRequirements{
  457. Requests: corev1.ResourceList{
  458. corev1.ResourceStorage: cr.Spec.RabbitmqVolumeSize,
  459. },
  460. },
  461. },
  462. }
  463. return &v1.StatefulSet{
  464. ObjectMeta: metav1.ObjectMeta{
  465. Name: cr.Name,
  466. Namespace: cr.Namespace,
  467. Labels: mergeMaps(returnLabels(cr),
  468. map[string]string{"rabbitmq.improvado.io/component": "messaging"},
  469. ),
  470. },
  471. Spec: v1.StatefulSetSpec{
  472. Replicas: &cr.Spec.RabbitmqReplicas,
  473. ServiceName: cr.Name + "-discovery",
  474. Selector: &metav1.LabelSelector{
  475. MatchLabels: returnLabels(cr),
  476. },
  477. Template: podTemplate,
  478. VolumeClaimTemplates: []corev1.PersistentVolumeClaim{PVCTemplate},
  479. UpdateStrategy: v1.StatefulSetUpdateStrategy{
  480. Type: v1.RollingUpdateStatefulSetStrategyType,
  481. },
  482. },
  483. }
  484. }