Kubernetes RabbitMQ operator https://github.com/tekliner/rabbitmq-operator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

546 lines
16 KiB

  1. package rabbitmq
  2. import (
  3. "context"
  4. "os"
  5. "reflect"
  6. "strconv"
  7. "time"
  8. "github.com/getsentry/raven-go"
  9. rabbitmqv1 "github.com/tekliner/rabbitmq-operator/pkg/apis/rabbitmq/v1"
  10. v1 "k8s.io/api/apps/v1"
  11. corev1 "k8s.io/api/core/v1"
  12. "k8s.io/apimachinery/pkg/api/errors"
  13. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  14. "k8s.io/apimachinery/pkg/runtime"
  15. "k8s.io/apimachinery/pkg/types"
  16. "sigs.k8s.io/controller-runtime/pkg/client"
  17. "sigs.k8s.io/controller-runtime/pkg/controller"
  18. "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
  19. "sigs.k8s.io/controller-runtime/pkg/event"
  20. "sigs.k8s.io/controller-runtime/pkg/handler"
  21. "sigs.k8s.io/controller-runtime/pkg/manager"
  22. "sigs.k8s.io/controller-runtime/pkg/predicate"
  23. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  24. logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
  25. "sigs.k8s.io/controller-runtime/pkg/source"
  26. )
  27. func init() {
  28. sentryDSN := os.Getenv("SENTRY_DSN")
  29. if sentryDSN != "" {
  30. raven.SetDSN(sentryDSN)
  31. }
  32. }
  33. var log = logf.Log.WithName("controller_rabbitmq")
  34. /**
  35. * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
  36. * business logic. Delete these comments after modifying this file.*
  37. */
  38. // Add creates a new Rabbitmq Controller and adds it to the Manager. The Manager will set fields on the Controller
  39. // and Start it when the Manager is Started.
  40. func Add(mgr manager.Manager) error {
  41. return add(mgr, newReconciler(mgr))
  42. }
  43. // newReconciler returns a new reconcile.Reconciler
  44. func newReconciler(mgr manager.Manager) reconcile.Reconciler {
  45. return &ReconcileRabbitmq{client: mgr.GetClient(), scheme: mgr.GetScheme()}
  46. }
  47. // add adds a new Controller to mgr with r as the reconcile.Reconciler
  48. func add(mgr manager.Manager, reconciler reconcile.Reconciler) error {
  49. // Create a new controller
  50. c, err := controller.New("rabbitmq-controller", mgr, controller.Options{Reconciler: reconciler})
  51. if err != nil {
  52. raven.CaptureErrorAndWait(err, nil)
  53. return err
  54. }
  55. // Watch for changes to primary resource Rabbitmq
  56. err = c.Watch(&source.Kind{Type: &rabbitmqv1.Rabbitmq{}}, &handler.EnqueueRequestForObject{})
  57. if err != nil {
  58. raven.CaptureErrorAndWait(err, nil)
  59. return err
  60. }
  61. // TODO(user): Modify this to be the types you create that are owned by the primary resource
  62. // Watch for changes to secondary resource Pods and requeue the owner Rabbitmq
  63. err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
  64. IsController: true,
  65. OwnerType: &rabbitmqv1.Rabbitmq{},
  66. })
  67. if err != nil {
  68. raven.CaptureErrorAndWait(err, nil)
  69. return err
  70. }
  71. // Watch for changes to secondary resource StatefulSets and requeue the owner
  72. err = c.Watch(&source.Kind{Type: &v1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
  73. OwnerType: &rabbitmqv1.Rabbitmq{},
  74. })
  75. if err != nil {
  76. raven.CaptureErrorAndWait(err, nil)
  77. return err
  78. }
  79. mapFn := handler.ToRequestsFunc(
  80. func(a handler.MapObject) []reconcile.Request {
  81. return []reconcile.Request{
  82. {NamespacedName: types.NamespacedName{Name: a.Meta.GetLabels()["app.improvado.io/instance"], Namespace: a.Meta.GetNamespace()}},
  83. }
  84. })
  85. p := predicate.Funcs{
  86. UpdateFunc: func(e event.UpdateEvent) bool {
  87. if _, ok := e.MetaOld.GetLabels()["app.improvado.io/instance"]; !ok {
  88. return false
  89. }
  90. return e.ObjectOld != e.ObjectNew
  91. },
  92. CreateFunc: func(e event.CreateEvent) bool {
  93. if _, ok := e.Meta.GetLabels()["app.improvado.io/instance"]; !ok {
  94. return false
  95. }
  96. return true
  97. },
  98. }
  99. err = c.Watch(
  100. &source.Kind{Type: &corev1.Secret{}},
  101. &handler.EnqueueRequestsFromMapFunc{
  102. ToRequests: mapFn,
  103. }, p)
  104. if err != nil {
  105. raven.CaptureErrorAndWait(err, nil)
  106. return err
  107. }
  108. return nil
  109. }
  110. var _ reconcile.Reconciler = &ReconcileRabbitmq{}
  111. // ReconcileRabbitmq reconciles a Rabbitmq object
  112. type ReconcileRabbitmq struct {
  113. // This client, initialized using mgr.Client() above, is a split client
  114. // that reads objects from the cache and writes to the apiserver
  115. client client.Client
  116. scheme *runtime.Scheme
  117. }
  118. // Reconcile reads that state of the cluster for a Rabbitmq object and makes changes based on the state read
  119. // and what is in the Rabbitmq.Spec
  120. // TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
  121. // a Pod as an example
  122. // Note:
  123. // The Controller will requeue the Request to be processed again if the returned error is non-nil or
  124. // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
  125. func mergeMaps(itermaps ...map[string]string) map[string]string {
  126. result := make(map[string]string)
  127. for _, rv := range itermaps {
  128. for k, v := range rv {
  129. result[k] = v
  130. }
  131. }
  132. return result
  133. }
  134. func returnLabels(cr *rabbitmqv1.Rabbitmq) map[string]string {
  135. labels := map[string]string{
  136. "app.improvado.io/application": "rabbitmq",
  137. "app.improvado.io/instance": cr.Name,
  138. }
  139. return labels
  140. }
  141. func returnAnnotationsPrometheus(cr *rabbitmqv1.Rabbitmq) map[string]string {
  142. return map[string]string{
  143. "prometheus.io/scrape": "true",
  144. "prometheus.io/port": strconv.Itoa(int(cr.Spec.RabbitmqPrometheusExporterPort)),
  145. }
  146. }
  147. func returnAnnotations(cr *rabbitmqv1.Rabbitmq) map[string]string {
  148. annotations := map[string]string{}
  149. if cr.Spec.RabbitmqPrometheusExporterPort > 0 {
  150. annotations = mergeMaps(annotations, returnAnnotationsPrometheus(cr))
  151. }
  152. return annotations
  153. }
  154. // Reconcile method
  155. func (r *ReconcileRabbitmq) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  156. reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
  157. reqLogger.Info("Reconciling Rabbitmq v1")
  158. // Fetch the Rabbitmq instance
  159. instance := &rabbitmqv1.Rabbitmq{}
  160. err := r.client.Get(context.TODO(), request.NamespacedName, instance)
  161. if err != nil {
  162. if errors.IsNotFound(err) {
  163. // Request object not statefulsetFound, could have been deleted after reconcile request.
  164. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
  165. // Return and don't requeue
  166. return reconcile.Result{}, nil
  167. }
  168. // Error reading the object - requeue the request.
  169. raven.CaptureErrorAndWait(err, nil)
  170. return reconcile.Result{}, err
  171. }
  172. // secrets used for API requests and user control
  173. reqLogger.Info("Reconciling secrets")
  174. secretNames, err := r.reconcileSecrets(reqLogger, instance)
  175. if err != nil {
  176. raven.CaptureErrorAndWait(err, nil)
  177. return reconcile.Result{}, err
  178. }
  179. statefulset := newStatefulSet(instance, secretNames)
  180. if err := controllerutil.SetControllerReference(instance, statefulset, r.scheme); err != nil {
  181. raven.CaptureErrorAndWait(err, nil)
  182. return reconcile.Result{}, err
  183. }
  184. statefulsetFound := &v1.StatefulSet{}
  185. err = r.client.Get(context.TODO(), types.NamespacedName{Name: statefulset.Name, Namespace: statefulset.Namespace}, statefulsetFound)
  186. if err != nil && errors.IsNotFound(err) {
  187. reqLogger.Info("Creating a new statefulset", "statefulset.Namespace", statefulset.Namespace, "statefulset.Name", statefulset.Name)
  188. err = r.client.Create(context.TODO(), statefulset)
  189. if err != nil {
  190. raven.CaptureErrorAndWait(err, nil)
  191. return reconcile.Result{}, err
  192. }
  193. // statefulset created successfully - don't requeue
  194. return reconcile.Result{}, nil
  195. } else if err != nil {
  196. raven.CaptureErrorAndWait(err, nil)
  197. return reconcile.Result{}, err
  198. }
  199. if !reflect.DeepEqual(statefulsetFound.Spec, statefulset.Spec) {
  200. statefulsetFound.Spec.Replicas = statefulset.Spec.Replicas
  201. statefulsetFound.Spec.Template = statefulset.Spec.Template
  202. }
  203. if !reflect.DeepEqual(statefulsetFound.Annotations, statefulset.Annotations) {
  204. statefulsetFound.Annotations = statefulset.Annotations
  205. }
  206. if !reflect.DeepEqual(statefulsetFound.Labels, statefulset.Labels) {
  207. statefulsetFound.Labels = statefulset.Labels
  208. }
  209. reqLogger.Info("Reconcile statefulset", "statefulset.Namespace", statefulsetFound.Namespace, "statefulset.Name", statefulsetFound.Name)
  210. if err = r.client.Update(context.TODO(), statefulsetFound); err != nil {
  211. reqLogger.Info("Reconcile statefulset error", "statefulset.Namespace", statefulsetFound.Namespace, "statefulset.Name", statefulsetFound.Name)
  212. raven.CaptureErrorAndWait(err, nil)
  213. return reconcile.Result{}, err
  214. }
  215. // creating services
  216. reqLogger.Info("Reconciling services")
  217. _, err = r.reconcileHTTPService(reqLogger, instance)
  218. if err != nil {
  219. raven.CaptureErrorAndWait(err, nil)
  220. return reconcile.Result{}, err
  221. }
  222. // all-in-one service
  223. _, err = r.reconcileHAService(reqLogger, instance)
  224. if err != nil {
  225. raven.CaptureErrorAndWait(err, nil)
  226. return reconcile.Result{}, err
  227. }
  228. _, err = r.reconcileDiscoveryService(reqLogger, instance)
  229. if err != nil {
  230. raven.CaptureErrorAndWait(err, nil)
  231. return reconcile.Result{}, err
  232. }
  233. // configmap
  234. reqLogger.Info("Reconciling configmap")
  235. _, err = r.reconcileConfigMap(reqLogger, instance, secretNames)
  236. if err != nil {
  237. raven.CaptureErrorAndWait(err, nil)
  238. return reconcile.Result{}, err
  239. }
  240. // check prometheus exporter flag
  241. if instance.Spec.RabbitmqPrometheusExporterPort > 0 {
  242. _, err = r.reconcilePrometheusExporterService(reqLogger, instance)
  243. if err != nil {
  244. raven.CaptureErrorAndWait(err, nil)
  245. return reconcile.Result{}, err
  246. }
  247. }
  248. // use ServiceMonitor?
  249. if instance.Spec.RabbitmqUseServiceMonitor {
  250. _, err = r.reconcilePrometheusExporterServiceMonitor(reqLogger, instance)
  251. if err != nil {
  252. raven.CaptureErrorAndWait(err, nil)
  253. return reconcile.Result{}, err
  254. }
  255. }
  256. // set policies
  257. reqLogger.Info("Setting up policies")
  258. timeoutPolicies, _ := time.ParseDuration("30")
  259. timeoutFlagPolicies := false
  260. ctxPolicies, ctxPoliciesCancelTimeout := context.WithTimeout(context.Background(), timeoutPolicies)
  261. defer ctxPoliciesCancelTimeout()
  262. go r.setPolicies(ctxPolicies, reqLogger, instance, secretNames)
  263. for {
  264. if timeoutFlagPolicies {
  265. break
  266. }
  267. select {
  268. case <-ctxPolicies.Done():
  269. timeoutFlagPolicies = true
  270. }
  271. }
  272. // set additional users
  273. reqLogger.Info("Setting up additional users")
  274. timeoutUsers, _ := time.ParseDuration("30")
  275. timeoutFlagUsers := false
  276. ctxUsers, ctxUsersCancelTimeout := context.WithTimeout(context.Background(), timeoutUsers)
  277. defer ctxUsersCancelTimeout()
  278. go r.syncUsersCredentials(ctxUsers, reqLogger, instance, secretNames)
  279. for {
  280. if timeoutFlagUsers {
  281. break
  282. }
  283. select {
  284. case <-ctxUsers.Done():
  285. timeoutFlagUsers = true
  286. }
  287. }
  288. // reconcile PodDisruptionBudget
  289. reqLogger.Info("Reconciling PodDisruptionBudget")
  290. _, err = r.reconcilePdb(reqLogger, instance)
  291. if err != nil {
  292. raven.CaptureErrorAndWait(err, nil)
  293. return reconcile.Result{}, err
  294. }
  295. _, err = r.reconcileFinalizers(reqLogger, instance)
  296. if err != nil {
  297. raven.CaptureErrorAndWait(err, nil)
  298. return reconcile.Result{}, err
  299. }
  300. return reconcile.Result{}, nil
  301. }
  302. func appendNodeVariables(env []corev1.EnvVar, cr *rabbitmqv1.Rabbitmq) []corev1.EnvVar {
  303. return append(env,
  304. corev1.EnvVar{
  305. Name: "RABBITMQ_USE_LONGNAME",
  306. Value: "true",
  307. },
  308. corev1.EnvVar{
  309. Name: "K8S_HOSTNAME_SUFFIX",
  310. Value: "." + cr.Name + "-discovery." + cr.Namespace + "." + cr.Spec.RabbitmqK8SServiceDiscovery,
  311. },
  312. corev1.EnvVar{
  313. Name: "K8S_SERVICE_NAME",
  314. Value: cr.Name + "-discovery",
  315. },
  316. corev1.EnvVar{
  317. Name: "MY_POD_NAME",
  318. ValueFrom: &corev1.EnvVarSource{
  319. FieldRef: &corev1.ObjectFieldSelector{
  320. FieldPath: "metadata.name",
  321. },
  322. },
  323. },
  324. corev1.EnvVar{
  325. Name: "RABBITMQ_NODENAME",
  326. Value: "rabbit@$(MY_POD_NAME)." + cr.Name + "-discovery." + cr.Namespace + "." + cr.Spec.RabbitmqK8SServiceDiscovery,
  327. },
  328. )
  329. }
  330. func newStatefulSet(cr *rabbitmqv1.Rabbitmq, secretNames secretResouces) *v1.StatefulSet {
  331. // prepare containers for pod
  332. podContainers := []corev1.Container{}
  333. // check affinity rules
  334. affinity := &corev1.Affinity{}
  335. if cr.Spec.RabbitmqAffinity != nil {
  336. affinity = cr.Spec.RabbitmqAffinity
  337. }
  338. // container with rabbitmq
  339. rabbitmqContainer := corev1.Container{
  340. Name: "rabbitmq",
  341. Image: cr.Spec.K8SImage.Name + ":" + cr.Spec.K8SImage.Tag,
  342. Env: append(appendNodeVariables(cr.Spec.K8SENV, cr), corev1.EnvVar{
  343. Name: "RABBITMQ_ERLANG_COOKIE",
  344. ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: cr.Name}, Key: ".erlang.cookie"}},
  345. }),
  346. Resources: corev1.ResourceRequirements{
  347. Requests: cr.Spec.RabbitmqPodRequests,
  348. Limits: cr.Spec.RabbitmqPodLimits,
  349. },
  350. VolumeMounts: []corev1.VolumeMount{
  351. {
  352. Name: "rabbit-etc",
  353. MountPath: "/etc/rabbitmq",
  354. },
  355. {
  356. Name: "rabbit-data",
  357. MountPath: "/var/lib/rabbitmq",
  358. },
  359. {
  360. Name: "rabbit-config",
  361. MountPath: "/rabbit-config",
  362. },
  363. },
  364. }
  365. podContainers = append(podContainers, rabbitmqContainer)
  366. // if prometheus exporter enabled add additional container to pod
  367. if cr.Spec.RabbitmqPrometheusExporterPort > 0 {
  368. exporterImageAndTag := "kbudde/rabbitmq-exporter:latest"
  369. if cr.Spec.RabbitmqPrometheusImage != "" {
  370. exporterImageAndTag = cr.Spec.RabbitmqPrometheusImage
  371. }
  372. exporterContainer := corev1.Container{
  373. Name: "prometheus-exporter",
  374. Image: exporterImageAndTag,
  375. ImagePullPolicy: corev1.PullIfNotPresent,
  376. Env: []corev1.EnvVar{
  377. {
  378. Name: "RABBIT_USER",
  379. ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretNames.ServiceAccount}, Key: "username"}},
  380. },
  381. {
  382. Name: "RABBIT_PASSWORD",
  383. ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretNames.ServiceAccount}, Key: "password"}},
  384. },
  385. },
  386. Ports: []corev1.ContainerPort{
  387. {
  388. Name: "exporter",
  389. Protocol: corev1.ProtocolTCP,
  390. ContainerPort: cr.Spec.RabbitmqPrometheusExporterPort,
  391. },
  392. },
  393. }
  394. podContainers = append(podContainers, exporterContainer)
  395. }
  396. podTemplate := corev1.PodTemplateSpec{
  397. ObjectMeta: metav1.ObjectMeta{
  398. Labels: mergeMaps(returnLabels(cr),
  399. map[string]string{"app.improvado.io/component": "messaging"},
  400. ),
  401. Annotations: returnAnnotations(cr),
  402. },
  403. Spec: corev1.PodSpec{
  404. Affinity: affinity,
  405. ServiceAccountName: cr.Spec.RabbitmqK8SServiceAccount,
  406. InitContainers: []corev1.Container{
  407. {
  408. Name: "copy-rabbitmq-config",
  409. Image: "busybox",
  410. Command: []string{"sh", "/rabbit-config/init.sh"},
  411. VolumeMounts: []corev1.VolumeMount{
  412. {
  413. Name: "rabbit-etc",
  414. MountPath: "/etc/rabbitmq",
  415. },
  416. {
  417. Name: "rabbit-config",
  418. MountPath: "/rabbit-config",
  419. },
  420. {
  421. Name: "rabbit-data",
  422. MountPath: "/var/lib/rabbitmq",
  423. },
  424. },
  425. },
  426. },
  427. Containers: podContainers,
  428. Tolerations: cr.Spec.Tolerations,
  429. NodeSelector: cr.Spec.NodeSelector,
  430. Volumes: []corev1.Volume{
  431. {
  432. Name: "rabbit-config",
  433. VolumeSource: corev1.VolumeSource{
  434. ConfigMap: &corev1.ConfigMapVolumeSource{
  435. LocalObjectReference: corev1.LocalObjectReference{
  436. Name: cr.Name,
  437. },
  438. },
  439. },
  440. },
  441. {
  442. Name: "rabbit-etc",
  443. VolumeSource: corev1.VolumeSource{
  444. EmptyDir: &corev1.EmptyDirVolumeSource{},
  445. },
  446. },
  447. },
  448. },
  449. }
  450. PVCTemplate := corev1.PersistentVolumeClaim{
  451. ObjectMeta: metav1.ObjectMeta{
  452. Name: "rabbit-data",
  453. Finalizers: cr.ObjectMeta.Finalizers,
  454. },
  455. Spec: corev1.PersistentVolumeClaimSpec{
  456. AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
  457. Resources: corev1.ResourceRequirements{
  458. Requests: corev1.ResourceList{
  459. corev1.ResourceStorage: cr.Spec.RabbitmqVolumeSize,
  460. },
  461. },
  462. },
  463. }
  464. return &v1.StatefulSet{
  465. ObjectMeta: metav1.ObjectMeta{
  466. Name: cr.Name,
  467. Namespace: cr.Namespace,
  468. Labels: mergeMaps(returnLabels(cr),
  469. map[string]string{"app.improvado.io/component": "messaging"},
  470. ),
  471. },
  472. Spec: v1.StatefulSetSpec{
  473. Replicas: &cr.Spec.RabbitmqReplicas,
  474. ServiceName: cr.Name + "-discovery",
  475. Selector: &metav1.LabelSelector{
  476. MatchLabels: returnLabels(cr),
  477. },
  478. Template: podTemplate,
  479. VolumeClaimTemplates: []corev1.PersistentVolumeClaim{PVCTemplate},
  480. UpdateStrategy: v1.StatefulSetUpdateStrategy{
  481. Type: v1.RollingUpdateStatefulSetStrategyType,
  482. },
  483. },
  484. }
  485. }