Kubernetes RabbitMQ operator https://github.com/tekliner/rabbitmq-operator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

537 lines
16 KiB

  1. package rabbitmq
  2. import (
  3. "context"
  4. "os"
  5. "reflect"
  6. "strconv"
  7. "time"
  8. "github.com/getsentry/raven-go"
  9. rabbitmqv1 "github.com/tekliner/rabbitmq-operator/pkg/apis/rabbitmq/v1"
  10. v1 "k8s.io/api/apps/v1"
  11. corev1 "k8s.io/api/core/v1"
  12. "k8s.io/apimachinery/pkg/api/errors"
  13. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  14. "k8s.io/apimachinery/pkg/runtime"
  15. "k8s.io/apimachinery/pkg/types"
  16. "sigs.k8s.io/controller-runtime/pkg/client"
  17. "sigs.k8s.io/controller-runtime/pkg/controller"
  18. "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
  19. "sigs.k8s.io/controller-runtime/pkg/event"
  20. "sigs.k8s.io/controller-runtime/pkg/handler"
  21. "sigs.k8s.io/controller-runtime/pkg/manager"
  22. "sigs.k8s.io/controller-runtime/pkg/predicate"
  23. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  24. logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
  25. "sigs.k8s.io/controller-runtime/pkg/source"
  26. )
  27. func init() {
  28. sentryDSN := os.Getenv("SENTRY_DSN")
  29. if sentryDSN != "" {
  30. raven.SetDSN(sentryDSN)
  31. }
  32. }
  33. var log = logf.Log.WithName("controller_rabbitmq")
  34. /**
  35. * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
  36. * business logic. Delete these comments after modifying this file.*
  37. */
  38. // Add creates a new Rabbitmq Controller and adds it to the Manager. The Manager will set fields on the Controller
  39. // and Start it when the Manager is Started.
  40. func Add(mgr manager.Manager) error {
  41. return add(mgr, newReconciler(mgr))
  42. }
  43. // newReconciler returns a new reconcile.Reconciler
  44. func newReconciler(mgr manager.Manager) reconcile.Reconciler {
  45. return &ReconcileRabbitmq{client: mgr.GetClient(), scheme: mgr.GetScheme()}
  46. }
  47. // add adds a new Controller to mgr with r as the reconcile.Reconciler
  48. func add(mgr manager.Manager, reconciler reconcile.Reconciler) error {
  49. // Create a new controller
  50. c, err := controller.New("rabbitmq-controller", mgr, controller.Options{Reconciler: reconciler})
  51. if err != nil {
  52. raven.CaptureErrorAndWait(err, nil)
  53. return err
  54. }
  55. // Watch for changes to primary resource Rabbitmq
  56. err = c.Watch(&source.Kind{Type: &rabbitmqv1.Rabbitmq{}}, &handler.EnqueueRequestForObject{})
  57. if err != nil {
  58. raven.CaptureErrorAndWait(err, nil)
  59. return err
  60. }
  61. // TODO(user): Modify this to be the types you create that are owned by the primary resource
  62. // Watch for changes to secondary resource Pods and requeue the owner Rabbitmq
  63. err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
  64. IsController: true,
  65. OwnerType: &rabbitmqv1.Rabbitmq{},
  66. })
  67. if err != nil {
  68. raven.CaptureErrorAndWait(err, nil)
  69. return err
  70. }
  71. // Watch for changes to secondary resource StatefulSets and requeue the owner
  72. err = c.Watch(&source.Kind{Type: &v1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
  73. OwnerType: &rabbitmqv1.Rabbitmq{},
  74. })
  75. if err != nil {
  76. raven.CaptureErrorAndWait(err, nil)
  77. return err
  78. }
  79. mapFn := handler.ToRequestsFunc(
  80. func(a handler.MapObject) []reconcile.Request {
  81. return []reconcile.Request{
  82. {NamespacedName: types.NamespacedName{Name: a.Meta.GetLabels()["app.improvado.io/instance"], Namespace: a.Meta.GetNamespace()}},
  83. }
  84. })
  85. p := predicate.Funcs{
  86. UpdateFunc: func(e event.UpdateEvent) bool {
  87. if _, ok := e.MetaOld.GetLabels()["app.improvado.io/instance"]; !ok {
  88. return false
  89. }
  90. return e.ObjectOld != e.ObjectNew
  91. },
  92. CreateFunc: func(e event.CreateEvent) bool {
  93. if _, ok := e.Meta.GetLabels()["app.improvado.io/instance"]; !ok {
  94. return false
  95. }
  96. return true
  97. },
  98. }
  99. err = c.Watch(
  100. &source.Kind{Type: &corev1.Secret{}},
  101. &handler.EnqueueRequestsFromMapFunc{
  102. ToRequests: mapFn,
  103. }, p)
  104. if err != nil {
  105. raven.CaptureErrorAndWait(err, nil)
  106. return err
  107. }
  108. return nil
  109. }
  110. var _ reconcile.Reconciler = &ReconcileRabbitmq{}
  111. // ReconcileRabbitmq reconciles a Rabbitmq object
  112. type ReconcileRabbitmq struct {
  113. // This client, initialized using mgr.Client() above, is a split client
  114. // that reads objects from the cache and writes to the apiserver
  115. client client.Client
  116. scheme *runtime.Scheme
  117. }
  118. // Reconcile reads that state of the cluster for a Rabbitmq object and makes changes based on the state read
  119. // and what is in the Rabbitmq.Spec
  120. // TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
  121. // a Pod as an example
  122. // Note:
  123. // The Controller will requeue the Request to be processed again if the returned error is non-nil or
  124. // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
  125. func mergeMaps(itermaps ...map[string]string) map[string]string {
  126. result := make(map[string]string)
  127. for _, rv := range itermaps {
  128. for k, v := range rv {
  129. result[k] = v
  130. }
  131. }
  132. return result
  133. }
  134. func returnLabels(cr *rabbitmqv1.Rabbitmq) map[string]string {
  135. labels := map[string]string{
  136. "app.improvado.io/application": "rabbitmq",
  137. "app.improvado.io/instance": cr.Name,
  138. }
  139. return labels
  140. }
  141. func returnAnnotationsPrometheus(cr *rabbitmqv1.Rabbitmq) map[string]string {
  142. return map[string]string{
  143. "prometheus.io/scrape": "true",
  144. "prometheus.io/port": strconv.Itoa(int(cr.Spec.RabbitmqPrometheusExporterPort)),
  145. }
  146. }
  147. func returnAnnotations(cr *rabbitmqv1.Rabbitmq) map[string]string {
  148. annotations := map[string]string{}
  149. if cr.Spec.RabbitmqPrometheusExporterPort > 0 {
  150. annotations = mergeMaps(annotations, returnAnnotationsPrometheus(cr))
  151. }
  152. return annotations
  153. }
  154. // Reconcile method
  155. func (r *ReconcileRabbitmq) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  156. reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
  157. reqLogger.Info("Reconciling Rabbitmq v1")
  158. // Fetch the Rabbitmq instance
  159. instance := &rabbitmqv1.Rabbitmq{}
  160. err := r.client.Get(context.TODO(), request.NamespacedName, instance)
  161. if err != nil {
  162. if errors.IsNotFound(err) {
  163. // Request object not statefulsetFound, could have been deleted after reconcile request.
  164. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
  165. // Return and don't requeue
  166. return reconcile.Result{}, nil
  167. }
  168. // Error reading the object - requeue the request.
  169. raven.CaptureErrorAndWait(err, nil)
  170. return reconcile.Result{}, err
  171. }
  172. // secrets used for API requests and user control
  173. reqLogger.Info("Reconciling secrets")
  174. secretNames, err := r.reconcileSecrets(reqLogger, instance)
  175. if err != nil {
  176. raven.CaptureErrorAndWait(err, nil)
  177. return reconcile.Result{}, err
  178. }
  179. statefulset := newStatefulSet(instance, secretNames)
  180. if err := controllerutil.SetControllerReference(instance, statefulset, r.scheme); err != nil {
  181. raven.CaptureErrorAndWait(err, nil)
  182. return reconcile.Result{}, err
  183. }
  184. statefulsetFound := &v1.StatefulSet{}
  185. err = r.client.Get(context.TODO(), types.NamespacedName{Name: statefulset.Name, Namespace: statefulset.Namespace}, statefulsetFound)
  186. if err != nil && errors.IsNotFound(err) {
  187. reqLogger.Info("Creating a new statefulset", "statefulset.Namespace", statefulset.Namespace, "statefulset.Name", statefulset.Name)
  188. err = r.client.Create(context.TODO(), statefulset)
  189. if err != nil {
  190. raven.CaptureErrorAndWait(err, nil)
  191. return reconcile.Result{}, err
  192. }
  193. // statefulset created successfully - don't requeue
  194. return reconcile.Result{}, nil
  195. } else if err != nil {
  196. raven.CaptureErrorAndWait(err, nil)
  197. return reconcile.Result{}, err
  198. }
  199. if !reflect.DeepEqual(statefulsetFound.Spec, statefulset.Spec) {
  200. statefulsetFound.Spec.Replicas = statefulset.Spec.Replicas
  201. statefulsetFound.Spec.Template = statefulset.Spec.Template
  202. }
  203. if !reflect.DeepEqual(statefulsetFound.Annotations, statefulset.Annotations) {
  204. statefulsetFound.Annotations = statefulset.Annotations
  205. }
  206. if !reflect.DeepEqual(statefulsetFound.Labels, statefulset.Labels) {
  207. statefulsetFound.Labels = statefulset.Labels
  208. }
  209. reqLogger.Info("Reconcile statefulset", "statefulset.Namespace", statefulsetFound.Namespace, "statefulset.Name", statefulsetFound.Name)
  210. if err = r.client.Update(context.TODO(), statefulsetFound); err != nil {
  211. reqLogger.Info("Reconcile statefulset error", "statefulset.Namespace", statefulsetFound.Namespace, "statefulset.Name", statefulsetFound.Name)
  212. raven.CaptureErrorAndWait(err, nil)
  213. return reconcile.Result{}, err
  214. }
  215. // creating services
  216. reqLogger.Info("Reconciling services")
  217. _, err = r.reconcileHTTPService(reqLogger, instance)
  218. if err != nil {
  219. raven.CaptureErrorAndWait(err, nil)
  220. return reconcile.Result{}, err
  221. }
  222. // all-in-one service
  223. _, err = r.reconcileHAService(reqLogger, instance)
  224. if err != nil {
  225. raven.CaptureErrorAndWait(err, nil)
  226. return reconcile.Result{}, err
  227. }
  228. _, err = r.reconcileDiscoveryService(reqLogger, instance)
  229. if err != nil {
  230. raven.CaptureErrorAndWait(err, nil)
  231. return reconcile.Result{}, err
  232. }
  233. // configmap
  234. reqLogger.Info("Reconciling configmap")
  235. _, err = r.reconcileConfigMap(reqLogger, instance, secretNames)
  236. if err != nil {
  237. raven.CaptureErrorAndWait(err, nil)
  238. return reconcile.Result{}, err
  239. }
  240. // check prometheus exporter flag
  241. if instance.Spec.RabbitmqPrometheusExporterPort > 0 {
  242. _, err = r.reconcilePrometheusExporterService(reqLogger, instance)
  243. if err != nil {
  244. raven.CaptureErrorAndWait(err, nil)
  245. return reconcile.Result{}, err
  246. }
  247. }
  248. // use ServiceMonitor?
  249. if instance.Spec.RabbitmqUseServiceMonitor {
  250. _, err = r.reconcilePrometheusExporterServiceMonitor(reqLogger, instance)
  251. if err != nil {
  252. raven.CaptureErrorAndWait(err, nil)
  253. return reconcile.Result{}, err
  254. }
  255. }
  256. // set policies
  257. reqLogger.Info("Setting up policies")
  258. timeoutPolicies, _ := time.ParseDuration("30")
  259. timeoutFlagPolicies := false
  260. ctxPolicies, ctxPoliciesCancelTimeout := context.WithTimeout(context.Background(), timeoutPolicies)
  261. defer ctxPoliciesCancelTimeout()
  262. go r.setPolicies(ctxPolicies, reqLogger, instance, secretNames)
  263. for {
  264. if timeoutFlagPolicies {
  265. break
  266. }
  267. select {
  268. case <-ctxPolicies.Done():
  269. timeoutFlagPolicies = true
  270. }
  271. }
  272. // set additional users
  273. reqLogger.Info("Setting up additional users")
  274. timeoutUsers, _ := time.ParseDuration("30")
  275. timeoutFlagUsers := false
  276. ctxUsers, ctxUsersCancelTimeout := context.WithTimeout(context.Background(), timeoutUsers)
  277. defer ctxUsersCancelTimeout()
  278. go r.syncUsersCredentials(ctxUsers, reqLogger, instance, secretNames)
  279. for {
  280. if timeoutFlagUsers {
  281. break
  282. }
  283. select {
  284. case <-ctxUsers.Done():
  285. timeoutFlagUsers = true
  286. }
  287. }
  288. _, err = r.reconcileFinalizers(reqLogger, instance)
  289. if err != nil {
  290. raven.CaptureErrorAndWait(err, nil)
  291. return reconcile.Result{}, err
  292. }
  293. return reconcile.Result{}, nil
  294. }
  295. func appendNodeVariables(env []corev1.EnvVar, cr *rabbitmqv1.Rabbitmq) []corev1.EnvVar {
  296. return append(env,
  297. corev1.EnvVar{
  298. Name: "RABBITMQ_USE_LONGNAME",
  299. Value: "true",
  300. },
  301. corev1.EnvVar{
  302. Name: "K8S_HOSTNAME_SUFFIX",
  303. Value: "." + cr.Name + "-discovery." + cr.Namespace + "." + cr.Spec.RabbitmqK8SServiceDiscovery,
  304. },
  305. corev1.EnvVar{
  306. Name: "K8S_SERVICE_NAME",
  307. Value: cr.Name + "-discovery",
  308. },
  309. corev1.EnvVar{
  310. Name: "MY_POD_NAME",
  311. ValueFrom: &corev1.EnvVarSource{
  312. FieldRef: &corev1.ObjectFieldSelector{
  313. FieldPath: "metadata.name",
  314. },
  315. },
  316. },
  317. corev1.EnvVar{
  318. Name: "RABBITMQ_NODENAME",
  319. Value: "rabbit@$(MY_POD_NAME)." + cr.Name + "-discovery." + cr.Namespace + "." + cr.Spec.RabbitmqK8SServiceDiscovery,
  320. },
  321. )
  322. }
  323. func newStatefulSet(cr *rabbitmqv1.Rabbitmq, secretNames secretResouces) *v1.StatefulSet {
  324. // prepare containers for pod
  325. podContainers := []corev1.Container{}
  326. // check affinity rules
  327. affinity := &corev1.Affinity{}
  328. if cr.Spec.RabbitmqAffinity != nil {
  329. affinity = cr.Spec.RabbitmqAffinity
  330. }
  331. // container with rabbitmq
  332. rabbitmqContainer := corev1.Container{
  333. Name: "rabbitmq",
  334. Image: cr.Spec.K8SImage.Name + ":" + cr.Spec.K8SImage.Tag,
  335. Env: append(appendNodeVariables(cr.Spec.K8SENV, cr), corev1.EnvVar{
  336. Name: "RABBITMQ_ERLANG_COOKIE",
  337. ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: cr.Name}, Key: ".erlang.cookie"}},
  338. }),
  339. Resources: corev1.ResourceRequirements{
  340. Requests: cr.Spec.RabbitmqPodRequests,
  341. Limits: cr.Spec.RabbitmqPodLimits,
  342. },
  343. VolumeMounts: []corev1.VolumeMount{
  344. {
  345. Name: "rabbit-etc",
  346. MountPath: "/etc/rabbitmq",
  347. },
  348. {
  349. Name: "rabbit-data",
  350. MountPath: "/var/lib/rabbitmq",
  351. },
  352. {
  353. Name: "rabbit-config",
  354. MountPath: "/rabbit-config",
  355. },
  356. },
  357. }
  358. podContainers = append(podContainers, rabbitmqContainer)
  359. // if prometheus exporter enabled add additional container to pod
  360. if cr.Spec.RabbitmqPrometheusExporterPort > 0 {
  361. exporterImageAndTag := "kbudde/rabbitmq-exporter:latest"
  362. if cr.Spec.RabbitmqPrometheusImage != "" {
  363. exporterImageAndTag = cr.Spec.RabbitmqPrometheusImage
  364. }
  365. exporterContainer := corev1.Container{
  366. Name: "prometheus-exporter",
  367. Image: exporterImageAndTag,
  368. ImagePullPolicy: corev1.PullIfNotPresent,
  369. Env: []corev1.EnvVar{
  370. {
  371. Name: "RABBIT_USER",
  372. ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretNames.ServiceAccount}, Key: "username"}},
  373. },
  374. {
  375. Name: "RABBIT_PASSWORD",
  376. ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretNames.ServiceAccount}, Key: "password"}},
  377. },
  378. },
  379. Ports: []corev1.ContainerPort{
  380. {
  381. Name: "exporter",
  382. Protocol: corev1.ProtocolTCP,
  383. ContainerPort: cr.Spec.RabbitmqPrometheusExporterPort,
  384. },
  385. },
  386. }
  387. podContainers = append(podContainers, exporterContainer)
  388. }
  389. podTemplate := corev1.PodTemplateSpec{
  390. ObjectMeta: metav1.ObjectMeta{
  391. Labels: mergeMaps(returnLabels(cr),
  392. map[string]string{"app.improvado.io/component": "messaging"},
  393. ),
  394. Annotations: returnAnnotations(cr),
  395. },
  396. Spec: corev1.PodSpec{
  397. Affinity: affinity,
  398. ServiceAccountName: cr.Spec.RabbitmqK8SServiceAccount,
  399. InitContainers: []corev1.Container{
  400. {
  401. Name: "copy-rabbitmq-config",
  402. Image: "busybox",
  403. Command: []string{"sh", "/rabbit-config/init.sh"},
  404. VolumeMounts: []corev1.VolumeMount{
  405. {
  406. Name: "rabbit-etc",
  407. MountPath: "/etc/rabbitmq",
  408. },
  409. {
  410. Name: "rabbit-config",
  411. MountPath: "/rabbit-config",
  412. },
  413. {
  414. Name: "rabbit-data",
  415. MountPath: "/var/lib/rabbitmq",
  416. },
  417. },
  418. },
  419. },
  420. Containers: podContainers,
  421. Tolerations: cr.Spec.Tolerations,
  422. NodeSelector: cr.Spec.NodeSelector,
  423. Volumes: []corev1.Volume{
  424. {
  425. Name: "rabbit-config",
  426. VolumeSource: corev1.VolumeSource{
  427. ConfigMap: &corev1.ConfigMapVolumeSource{
  428. LocalObjectReference: corev1.LocalObjectReference{
  429. Name: cr.Name,
  430. },
  431. },
  432. },
  433. },
  434. {
  435. Name: "rabbit-etc",
  436. VolumeSource: corev1.VolumeSource{
  437. EmptyDir: &corev1.EmptyDirVolumeSource{},
  438. },
  439. },
  440. },
  441. },
  442. }
  443. PVCTemplate := corev1.PersistentVolumeClaim{
  444. ObjectMeta: metav1.ObjectMeta{
  445. Name: "rabbit-data",
  446. Finalizers: cr.ObjectMeta.Finalizers,
  447. },
  448. Spec: corev1.PersistentVolumeClaimSpec{
  449. AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
  450. Resources: corev1.ResourceRequirements{
  451. Requests: corev1.ResourceList{
  452. corev1.ResourceStorage: cr.Spec.RabbitmqVolumeSize,
  453. },
  454. },
  455. },
  456. }
  457. return &v1.StatefulSet{
  458. ObjectMeta: metav1.ObjectMeta{
  459. Name: cr.Name,
  460. Namespace: cr.Namespace,
  461. Labels: mergeMaps(returnLabels(cr),
  462. map[string]string{"app.improvado.io/component": "messaging"},
  463. ),
  464. },
  465. Spec: v1.StatefulSetSpec{
  466. Replicas: &cr.Spec.RabbitmqReplicas,
  467. ServiceName: cr.Name + "-discovery",
  468. Selector: &metav1.LabelSelector{
  469. MatchLabels: returnLabels(cr),
  470. },
  471. Template: podTemplate,
  472. VolumeClaimTemplates: []corev1.PersistentVolumeClaim{PVCTemplate},
  473. UpdateStrategy: v1.StatefulSetUpdateStrategy{
  474. Type: v1.RollingUpdateStatefulSetStrategyType,
  475. },
  476. },
  477. }
  478. }