Redis operator for Kubernetes with HAProxy support
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

500 lines
16 KiB

  1. package redis
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "git.blindage.org/21h/redis-operator/pkg/controller/manifests"
  8. rediscli "github.com/go-redis/redis"
  9. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  10. "k8s.io/apimachinery/pkg/labels"
  11. "k8s.io/apimachinery/pkg/util/intstr"
  12. blindagev1alpha1 "git.blindage.org/21h/redis-operator/pkg/apis/blindage/v1alpha1"
  13. raven "github.com/getsentry/raven-go"
  14. v1 "k8s.io/api/apps/v1"
  15. corev1 "k8s.io/api/core/v1"
  16. "k8s.io/api/policy/v1beta1"
  17. "k8s.io/apimachinery/pkg/api/errors"
  18. "k8s.io/apimachinery/pkg/runtime"
  19. "sigs.k8s.io/controller-runtime/pkg/client"
  20. "sigs.k8s.io/controller-runtime/pkg/controller"
  21. "sigs.k8s.io/controller-runtime/pkg/handler"
  22. "sigs.k8s.io/controller-runtime/pkg/manager"
  23. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  24. logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
  25. "sigs.k8s.io/controller-runtime/pkg/source"
  26. )
  27. var log = logf.Log.WithName("controller_redis")
  28. func init() {
  29. if os.Getenv("SENTRY_DSN") != "" {
  30. raven.SetDSN(os.Getenv("SENTRY_DSN"))
  31. }
  32. }
  33. func Add(mgr manager.Manager) error {
  34. return add(mgr, newReconciler(mgr))
  35. }
  36. // newReconciler returns a new reconcile.Reconciler
  37. func newReconciler(mgr manager.Manager) reconcile.Reconciler {
  38. return &ReconcileRedis{client: mgr.GetClient(), scheme: mgr.GetScheme()}
  39. }
  40. // add adds a new Controller to mgr with r as the reconcile.Reconciler
  41. func add(mgr manager.Manager, r reconcile.Reconciler) error {
  42. // Create a new controller
  43. c, err := controller.New("redis-controller", mgr, controller.Options{Reconciler: r})
  44. if err != nil {
  45. return err
  46. }
  47. // Watch for changes to primary resource Redis
  48. err = c.Watch(&source.Kind{Type: &blindagev1alpha1.Redis{}}, &handler.EnqueueRequestForObject{})
  49. if err != nil {
  50. return err
  51. }
  52. // Watch for changes to secondary resource Pods and requeue the owner Redis
  53. err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
  54. IsController: true,
  55. OwnerType: &blindagev1alpha1.Redis{},
  56. })
  57. if err != nil {
  58. return err
  59. }
  60. err = c.Watch(&source.Kind{Type: &v1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
  61. IsController: true,
  62. OwnerType: &blindagev1alpha1.Redis{},
  63. })
  64. if err != nil {
  65. return err
  66. }
  67. return nil
  68. }
  69. // blank assignment to verify that ReconcileRedis implements reconcile.Reconciler
  70. var _ reconcile.Reconciler = &ReconcileRedis{}
  71. // ReconcileRedis reconciles a Redis object
  72. type ReconcileRedis struct {
  73. client client.Client
  74. scheme *runtime.Scheme
  75. }
  76. // Reconcile means magic begins
  77. func (r *ReconcileRedis) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  78. reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
  79. reqLogger.Info("Reconciling Redis")
  80. // Fetch the Redis instance
  81. instance := &blindagev1alpha1.Redis{}
  82. err := r.client.Get(context.TODO(), request.NamespacedName, instance)
  83. if err != nil {
  84. if errors.IsNotFound(err) {
  85. return reconcile.Result{}, nil
  86. }
  87. // Error reading the object - requeue the request.
  88. return reconcile.Result{}, err
  89. }
  90. _, err = r.reconcileFinalizers(reqLogger, instance)
  91. if err != nil {
  92. raven.CaptureErrorAndWait(err, nil)
  93. return reconcile.Result{}, err
  94. }
  95. // Prepare Sentinel config
  96. configSentinelName := instance.Name + "-sentinel"
  97. configSentinelTemplate := `
  98. sentinel monitor redismaster %v 6379 %v
  99. sentinel down-after-milliseconds redismaster 1000
  100. sentinel failover-timeout redismaster 3000
  101. sentinel parallel-syncs redismaster 2
  102. `
  103. configSentinelData := map[string]string{"sentinel.conf": fmt.Sprintf(configSentinelTemplate, instance.Name+"-sentinel", instance.Spec.Quorum)}
  104. if _, err := r.ReconcileConfigmap(reqLogger, instance, configSentinelName, configSentinelData); err != nil {
  105. return reconcile.Result{}, err
  106. }
  107. configRedisName := instance.Name + "-redis"
  108. configRedisData := map[string]string{"redis.conf": `
  109. slaveof 127.0.0.1 6379
  110. tcp-keepalive 60
  111. save 900 1
  112. save 300 10
  113. `}
  114. if _, err := r.ReconcileConfigmap(reqLogger, instance, configRedisName, configRedisData); err != nil {
  115. return reconcile.Result{}, err
  116. }
  117. configFailoverName := instance.Name + "-failover"
  118. configFailoverData := map[string]string{"failover.sh": `
  119. MASTER_HOST=$(redis-cli -h ${SENTINEL_SERVICE} -p 26379 --csv SENTINEL get-master-addr-by-name redismaster | tr ',' ' ' | tr -d '\"' |cut -d' ' -f1)
  120. if [[ ${MASTER_HOST} == $(hostname -i) ]]; then
  121. redis-cli -h ${SENTINEL_SERVICE} -p 26379 SENTINEL failover redismaster
  122. fi
  123. `}
  124. if _, err := r.ReconcileConfigmap(reqLogger, instance, configFailoverName, configFailoverData); err != nil {
  125. return reconcile.Result{}, err
  126. }
  127. // reconcile Sentinel deployment
  128. newSentinelDeployment := manifests.GenerateDeployment(instance)
  129. if _, err := r.ReconcileDeployment(reqLogger, instance, newSentinelDeployment); err != nil {
  130. return reconcile.Result{}, err
  131. }
  132. // reconcile Redis StatefulSet
  133. newRedisStatefulset := manifests.GenerateStatefulSet(instance)
  134. if _, err := r.ReconcileStatefulSet(reqLogger, instance, newRedisStatefulset); err != nil {
  135. return reconcile.Result{}, err
  136. }
  137. // create sentinel and redis services
  138. serviceName := instance.Name + "-sentinel"
  139. servicePortName := "sentinel"
  140. servicePort := int32(26379)
  141. serviceSelector := map[string]string{"component": "sentinel"}
  142. if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort}, serviceSelector); err != nil {
  143. return reconcile.Result{}, err
  144. }
  145. serviceName = instance.Name + "-redis"
  146. servicePortName = "redis"
  147. servicePort = int32(6379)
  148. serviceSelector = map[string]string{"component": "redis"}
  149. if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort}, serviceSelector); err != nil {
  150. return reconcile.Result{}, err
  151. }
  152. // create PDB resources
  153. if instance.Spec.PdbRedis != nil {
  154. pdbName := instance.Name + "-redis"
  155. pdbSpec := v1beta1.PodDisruptionBudgetSpec{
  156. Selector: &metav1.LabelSelector{
  157. MatchLabels: manifests.MergeLabels(manifests.BaseLabels(instance), map[string]string{"component": "redis"}),
  158. },
  159. }
  160. if instance.Spec.PdbRedis.MaxUnavailable != nil {
  161. pdbSpec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbRedis.MaxUnavailable}
  162. }
  163. if instance.Spec.PdbRedis.MinAvailable != nil {
  164. pdbSpec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbRedis.MinAvailable}
  165. }
  166. if _, err := r.ReconcilePodDisruptionBudget(reqLogger, instance, pdbName, pdbSpec); err != nil {
  167. return reconcile.Result{}, err
  168. }
  169. } else {
  170. pdb := v1beta1.PodDisruptionBudget{
  171. ObjectMeta: metav1.ObjectMeta{
  172. Name: instance.Name + "-redis",
  173. Namespace: instance.Namespace,
  174. },
  175. Spec: v1beta1.PodDisruptionBudgetSpec{},
  176. }
  177. err := r.client.Delete(context.TODO(), &pdb)
  178. if err != nil && !errors.IsNotFound(err) {
  179. raven.CaptureErrorAndWait(err, nil)
  180. reqLogger.Info("PodDisruptionBudget deletion error", "Namespace", pdb.Namespace, "Name", pdb.Name, "Error", err)
  181. return reconcile.Result{}, err
  182. }
  183. }
  184. if instance.Spec.PdbSentinel != nil {
  185. pdbName := instance.Name + "-sentinel"
  186. pdbSpec := v1beta1.PodDisruptionBudgetSpec{
  187. Selector: &metav1.LabelSelector{
  188. MatchLabels: manifests.MergeLabels(manifests.BaseLabels(instance), map[string]string{"component": "sentinel"}),
  189. },
  190. }
  191. if instance.Spec.PdbSentinel.MaxUnavailable != nil {
  192. // adorable if MaxUnavailable < (SentinelReplicas/2), just to save quorum
  193. if *instance.Spec.PdbSentinel.MaxUnavailable > (*instance.Spec.SentinelReplicas / 2) {
  194. reqLogger.Error(err, "Sentinel MaxUnavailable must be lesser then sentinelReplicas/2 to save quorum", "Namespace", instance.Namespace, "Name", instance.Name)
  195. return reconcile.Result{}, err
  196. }
  197. pdbSpec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbSentinel.MaxUnavailable}
  198. }
  199. if instance.Spec.PdbSentinel.MinAvailable != nil {
  200. // adorable if MinAvailable > (SentinelReplicas/2), just to save quorum
  201. if *instance.Spec.PdbSentinel.MinAvailable < (*instance.Spec.SentinelReplicas / 2) {
  202. reqLogger.Error(err, "Sentinel MinAvailable must be greater then sentinelReplicas/2 to save quorum", "Namespace", instance.Namespace, "Name", instance.Name)
  203. return reconcile.Result{}, err
  204. }
  205. pdbSpec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbSentinel.MinAvailable}
  206. }
  207. if _, err := r.ReconcilePodDisruptionBudget(reqLogger, instance, pdbName, pdbSpec); err != nil {
  208. return reconcile.Result{}, err
  209. }
  210. } else {
  211. pdb := v1beta1.PodDisruptionBudget{
  212. ObjectMeta: metav1.ObjectMeta{
  213. Name: instance.Name + "-sentinel",
  214. Namespace: instance.Namespace,
  215. },
  216. Spec: v1beta1.PodDisruptionBudgetSpec{},
  217. }
  218. err := r.client.Delete(context.TODO(), &pdb)
  219. if err != nil && !errors.IsNotFound(err) {
  220. raven.CaptureErrorAndWait(err, nil)
  221. reqLogger.Info("PodDisruptionBudget deletion error", "Namespace", pdb.Namespace, "Name", pdb.Name, "Error", err)
  222. return reconcile.Result{}, err
  223. }
  224. }
  225. if instance.Spec.PdbHaproxy != nil {
  226. pdbName := instance.Name + "-haproxy"
  227. pdbSpec := v1beta1.PodDisruptionBudgetSpec{
  228. Selector: &metav1.LabelSelector{
  229. MatchLabels: manifests.MergeLabels(manifests.BaseLabels(instance), map[string]string{"component": "haproxy"}),
  230. },
  231. }
  232. if instance.Spec.PdbHaproxy.MaxUnavailable != nil {
  233. pdbSpec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbHaproxy.MaxUnavailable}
  234. }
  235. if instance.Spec.PdbHaproxy.MinAvailable != nil {
  236. pdbSpec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbHaproxy.MinAvailable}
  237. }
  238. if _, err := r.ReconcilePodDisruptionBudget(reqLogger, instance, pdbName, pdbSpec); err != nil {
  239. return reconcile.Result{}, err
  240. }
  241. } else {
  242. pdb := v1beta1.PodDisruptionBudget{
  243. ObjectMeta: metav1.ObjectMeta{
  244. Name: instance.Name + "-haproxy",
  245. Namespace: instance.Namespace,
  246. },
  247. Spec: v1beta1.PodDisruptionBudgetSpec{},
  248. }
  249. err := r.client.Delete(context.TODO(), &pdb)
  250. if err != nil && !errors.IsNotFound(err) {
  251. raven.CaptureErrorAndWait(err, nil)
  252. reqLogger.Info("PodDisruptionBudget deletion error", "Namespace", pdb.Namespace, "Name", pdb.Name, "Error", err)
  253. return reconcile.Result{}, err
  254. }
  255. }
  256. // set Redis master
  257. podList := &corev1.PodList{}
  258. labelSelector := labels.SelectorFromSet(newRedisStatefulset.Labels)
  259. listOpts := &client.ListOptions{
  260. Namespace: newRedisStatefulset.Namespace,
  261. LabelSelector: labelSelector,
  262. }
  263. err = r.client.List(context.TODO(), listOpts, podList)
  264. if err != nil {
  265. reqLogger.Error(err, "Failed to list Pods.", "Namespace", instance.Namespace, "Name", instance.Name)
  266. return reconcile.Result{}, err
  267. }
  268. if len(podList.Items) < 1 {
  269. reqLogger.Error(err, "Pods < 0", "Namespace", instance.Namespace, "Name", instance.Name)
  270. return reconcile.Result{}, err
  271. }
  272. // Order the pods so we start by the oldest one
  273. sort.Slice(podList.Items, func(i, j int) bool {
  274. return podList.Items[i].CreationTimestamp.Before(&podList.Items[j].CreationTimestamp)
  275. })
  276. newMasterIP := ""
  277. podIPs := []string{}
  278. for _, pod := range podList.Items {
  279. // pod will be deleted, skip
  280. if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
  281. continue
  282. }
  283. if pod.Status.Phase == corev1.PodPending || pod.Status.Phase == corev1.PodRunning {
  284. // for haproxy if enabled
  285. podIPs = append(podIPs, pod.Status.PodIP)
  286. if newMasterIP == "" {
  287. newMasterIP = pod.Status.PodIP
  288. reqLogger.Info("New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
  289. if err := querySetMaster(newMasterIP); err != nil {
  290. reqLogger.Error(err, "Error! New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
  291. return reconcile.Result{}, err
  292. }
  293. } else {
  294. reqLogger.Info("Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
  295. if err := querySetSlaveOf(pod.Status.PodIP, newMasterIP); err != nil {
  296. reqLogger.Error(err, "Error! Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
  297. return reconcile.Result{}, err
  298. }
  299. }
  300. }
  301. }
  302. // haproxy
  303. // check if you need haproxy
  304. if instance.Spec.UseHAProxy {
  305. configHaproxyShepherdName := instance.Name + "-haproxy-shepherd"
  306. configHaproxyShepherdData := map[string]string{"shepherd.sh": `
  307. #!/bin/sh
  308. echo "Start"
  309. MONFILE='/usr/local/etc/haproxy/haproxy.cfg'
  310. PIDFILE='/run/haproxy.pid'
  311. MD5FILE='/tmp/haproxy.cfg.md5'
  312. touch ${MD5FILE}
  313. while true
  314. do
  315. MD5LAST="$(cat ${MD5FILE})"
  316. echo "Read MD5 of ${MD5FILE}: ${MD5LAST}"
  317. if [ -z "${MD5LAST}" ]
  318. then
  319. echo "First time check, md5 file is empty"
  320. echo "$(md5sum ${MONFILE})" > ${MD5FILE}
  321. else
  322. echo "Get md5 and compare with last time"
  323. MD5CURRENT="$(md5sum ${MONFILE})"
  324. if [ "${MD5CURRENT}" != "${MD5LAST}" ]
  325. then
  326. echo "Send signal to haproxy"
  327. kill -HUP $(cat ${PIDFILE})
  328. echo "${MD5CURRENT}" > ${MD5FILE}
  329. fi
  330. fi
  331. # sleep 5 seconds, it will be enough to not disturb haproxy while pods rapidly creates or dies
  332. sleep 5
  333. done
  334. `}
  335. if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyShepherdName, configHaproxyShepherdData); err != nil {
  336. return reconcile.Result{}, err
  337. }
  338. redisEndpointTemplate := " server redis_backend_%v %v:6379 maxconn 1024 check inter %vs\n"
  339. redisEndpoints := ""
  340. haproxyBackendCheckInterval := 1
  341. if instance.Spec.HAProxyBackendCheckInterval > 0 {
  342. haproxyBackendCheckInterval = instance.Spec.HAProxyBackendCheckInterval
  343. }
  344. for num, ip := range podIPs {
  345. redisEndpoints = redisEndpoints + fmt.Sprintf(redisEndpointTemplate, num, ip, haproxyBackendCheckInterval)
  346. }
  347. configHaproxyConfigName := instance.Name + "-haproxy"
  348. configHaproxyConfigData := map[string]string{"haproxy.cfg": `
  349. global
  350. pidfile /run/haproxy.pid
  351. defaults
  352. mode tcp
  353. timeout connect 5s
  354. timeout server %vs
  355. timeout client %vs
  356. option tcpka
  357. listen stats
  358. mode http
  359. bind :9000
  360. stats enable
  361. stats hide-version
  362. stats realm Haproxy\ Statistics
  363. stats uri /haproxy_stats
  364. frontend ft_redis
  365. mode tcp
  366. bind *:6379
  367. default_backend bk_redis
  368. backend bk_redis
  369. mode tcp
  370. option tcp-check
  371. tcp-check send PING\r\n
  372. tcp-check expect string +PONG
  373. tcp-check send info\ replication\r\n
  374. tcp-check expect string role:master
  375. tcp-check send QUIT\r\n
  376. tcp-check expect string +OK
  377. `}
  378. haproxyTimeoutServer := 30
  379. if instance.Spec.HAProxyTimeoutServer > 0 {
  380. haproxyTimeoutServer = instance.Spec.HAProxyTimeoutServer
  381. }
  382. haproxyTimeoutClient := 30
  383. if instance.Spec.HAProxyTimeoutClient > 0 {
  384. haproxyTimeoutClient = instance.Spec.HAProxyTimeoutClient
  385. }
  386. configHaproxyConfigData["haproxy.cfg"] = fmt.Sprintf(configHaproxyConfigData["haproxy.cfg"], haproxyTimeoutServer, haproxyTimeoutClient) + redisEndpoints
  387. if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyConfigName, configHaproxyConfigData); err != nil {
  388. return reconcile.Result{}, err
  389. }
  390. // reconcile HAProxy deployment
  391. newHAProxyDeployment := manifests.GenerateHaproxyDeployment(instance)
  392. if _, err := r.ReconcileDeployment(reqLogger, instance, newHAProxyDeployment); err != nil {
  393. return reconcile.Result{}, err
  394. }
  395. // create haproxy service
  396. serviceName = instance.Name + "-haproxy"
  397. servicePortName = "haproxy"
  398. servicePort = int32(6379)
  399. serviceSelector = map[string]string{"component": "haproxy"}
  400. if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort, "stats": 9000}, serviceSelector); err != nil {
  401. return reconcile.Result{}, err
  402. }
  403. }
  404. reqLogger.Info("Reconcile complete", "Namespace", instance.Namespace, "Name", instance.Name)
  405. return reconcile.Result{}, nil
  406. }
  407. func querySetMaster(ip string) error {
  408. options := &rediscli.Options{
  409. Addr: fmt.Sprintf("%s:%s", ip, "6379"),
  410. Password: "",
  411. DB: 0,
  412. }
  413. rClient := rediscli.NewClient(options)
  414. defer rClient.Close()
  415. if res := rClient.SlaveOf("NO", "ONE"); res.Err() != nil {
  416. return res.Err()
  417. }
  418. return nil
  419. }
  420. func querySetSlaveOf(ip string, masterIP string) error {
  421. options := &rediscli.Options{
  422. Addr: fmt.Sprintf("%s:%s", ip, "6379"),
  423. Password: "",
  424. DB: 0,
  425. }
  426. rClient := rediscli.NewClient(options)
  427. defer rClient.Close()
  428. if res := rClient.SlaveOf(masterIP, "6379"); res.Err() != nil {
  429. return res.Err()
  430. }
  431. return nil
  432. }