Redis operator for Kubernetes with HAProxy support
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

375 lines
11 KiB

  1. package redis
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "git.blindage.org/21h/redis-operator/pkg/controller/manifests"
  8. rediscli "github.com/go-redis/redis"
  9. "k8s.io/apimachinery/pkg/labels"
  10. blindagev1alpha1 "git.blindage.org/21h/redis-operator/pkg/apis/blindage/v1alpha1"
  11. raven "github.com/getsentry/raven-go"
  12. v1 "k8s.io/api/apps/v1"
  13. corev1 "k8s.io/api/core/v1"
  14. "k8s.io/apimachinery/pkg/api/errors"
  15. "k8s.io/apimachinery/pkg/runtime"
  16. "sigs.k8s.io/controller-runtime/pkg/client"
  17. "sigs.k8s.io/controller-runtime/pkg/controller"
  18. "sigs.k8s.io/controller-runtime/pkg/handler"
  19. "sigs.k8s.io/controller-runtime/pkg/manager"
  20. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  21. logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
  22. "sigs.k8s.io/controller-runtime/pkg/source"
  23. )
  24. var log = logf.Log.WithName("controller_redis")
  25. func init() {
  26. if os.Getenv("SENTRY_DSN") != "" {
  27. raven.SetDSN(os.Getenv("SENTRY_DSN"))
  28. }
  29. }
  30. func Add(mgr manager.Manager) error {
  31. return add(mgr, newReconciler(mgr))
  32. }
  33. // newReconciler returns a new reconcile.Reconciler
  34. func newReconciler(mgr manager.Manager) reconcile.Reconciler {
  35. return &ReconcileRedis{client: mgr.GetClient(), scheme: mgr.GetScheme()}
  36. }
  37. // add adds a new Controller to mgr with r as the reconcile.Reconciler
  38. func add(mgr manager.Manager, r reconcile.Reconciler) error {
  39. // Create a new controller
  40. c, err := controller.New("redis-controller", mgr, controller.Options{Reconciler: r})
  41. if err != nil {
  42. return err
  43. }
  44. // Watch for changes to primary resource Redis
  45. err = c.Watch(&source.Kind{Type: &blindagev1alpha1.Redis{}}, &handler.EnqueueRequestForObject{})
  46. if err != nil {
  47. return err
  48. }
  49. // Watch for changes to secondary resource Pods and requeue the owner Redis
  50. err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
  51. IsController: true,
  52. OwnerType: &blindagev1alpha1.Redis{},
  53. })
  54. if err != nil {
  55. return err
  56. }
  57. err = c.Watch(&source.Kind{Type: &v1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
  58. IsController: true,
  59. OwnerType: &blindagev1alpha1.Redis{},
  60. })
  61. if err != nil {
  62. return err
  63. }
  64. return nil
  65. }
  66. // blank assignment to verify that ReconcileRedis implements reconcile.Reconciler
  67. var _ reconcile.Reconciler = &ReconcileRedis{}
  68. // ReconcileRedis reconciles a Redis object
  69. type ReconcileRedis struct {
  70. client client.Client
  71. scheme *runtime.Scheme
  72. }
  73. // Reconcile means magic begins
  74. func (r *ReconcileRedis) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  75. reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
  76. reqLogger.Info("Reconciling Redis")
  77. // Fetch the Redis instance
  78. instance := &blindagev1alpha1.Redis{}
  79. err := r.client.Get(context.TODO(), request.NamespacedName, instance)
  80. if err != nil {
  81. if errors.IsNotFound(err) {
  82. return reconcile.Result{}, nil
  83. }
  84. // Error reading the object - requeue the request.
  85. return reconcile.Result{}, err
  86. }
  87. // Prepare Sentinel config
  88. configSentinelName := instance.Name + "-sentinel"
  89. configSentinelTemplate := `
  90. sentinel monitor redismaster %v 6379 %v
  91. sentinel down-after-milliseconds redismaster 1000
  92. sentinel failover-timeout redismaster 3000
  93. sentinel parallel-syncs redismaster 2
  94. `
  95. configSentinelData := map[string]string{"sentinel.conf": fmt.Sprintf(configSentinelTemplate, instance.Name+"-sentinel", instance.Spec.Quorum)}
  96. if _, err := r.ReconcileConfigmap(reqLogger, instance, configSentinelName, configSentinelData); err != nil {
  97. return reconcile.Result{}, err
  98. }
  99. configRedisName := instance.Name + "-redis"
  100. configRedisData := map[string]string{"redis.conf": `
  101. slaveof 127.0.0.1 6379
  102. tcp-keepalive 60
  103. save 900 1
  104. save 300 10
  105. `}
  106. if _, err := r.ReconcileConfigmap(reqLogger, instance, configRedisName, configRedisData); err != nil {
  107. return reconcile.Result{}, err
  108. }
  109. configFailoverName := instance.Name + "-failover"
  110. configFailoverData := map[string]string{"failover.sh": `
  111. MASTER_HOST=$(redis-cli -h ${SENTINEL_SERVICE} -p 26379 --csv SENTINEL get-master-addr-by-name redismaster | tr ',' ' ' | tr -d '\"' |cut -d' ' -f1)
  112. if [[ ${MASTER_HOST} == $(hostname -i) ]]; then
  113. redis-cli -h ${SENTINEL_SERVICE} -p 26379 SENTINEL failover redismaster
  114. fi
  115. `}
  116. if _, err := r.ReconcileConfigmap(reqLogger, instance, configFailoverName, configFailoverData); err != nil {
  117. return reconcile.Result{}, err
  118. }
  119. // reconcile Sentinel deployment
  120. newSentinelDeployment := manifests.GenerateDeployment(instance)
  121. if _, err := r.ReconcileDeployment(reqLogger, instance, newSentinelDeployment); err != nil {
  122. return reconcile.Result{}, err
  123. }
  124. // reconcile Redis StatefulSet
  125. newRedisStatefulset := manifests.GenerateStatefulSet(instance)
  126. if _, err := r.ReconcileStatefulSet(reqLogger, instance, newRedisStatefulset); err != nil {
  127. return reconcile.Result{}, err
  128. }
  129. // create sentinel and redis services
  130. serviceName := instance.Name + "-sentinel"
  131. servicePortName := "sentinel"
  132. servicePort := int32(26379)
  133. serviceSelector := map[string]string{"component": "sentinel"}
  134. if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort}, serviceSelector); err != nil {
  135. return reconcile.Result{}, err
  136. }
  137. serviceName = instance.Name + "-redis"
  138. servicePortName = "redis"
  139. servicePort = int32(6379)
  140. serviceSelector = map[string]string{"component": "redis"}
  141. if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort}, serviceSelector); err != nil {
  142. return reconcile.Result{}, err
  143. }
  144. // set Redis master
  145. podList := &corev1.PodList{}
  146. labelSelector := labels.SelectorFromSet(newRedisStatefulset.Labels)
  147. listOpts := &client.ListOptions{
  148. Namespace: newRedisStatefulset.Namespace,
  149. LabelSelector: labelSelector,
  150. }
  151. err = r.client.List(context.TODO(), listOpts, podList)
  152. if err != nil {
  153. reqLogger.Error(err, "Failed to list Pods.", "Namespace", instance.Namespace, "Name", instance.Name)
  154. return reconcile.Result{}, err
  155. }
  156. if len(podList.Items) < 1 {
  157. reqLogger.Error(err, "Pods < 0", "Namespace", instance.Namespace, "Name", instance.Name)
  158. return reconcile.Result{}, err
  159. }
  160. // Order the pods so we start by the oldest one
  161. sort.Slice(podList.Items, func(i, j int) bool {
  162. return podList.Items[i].CreationTimestamp.Before(&podList.Items[j].CreationTimestamp)
  163. })
  164. newMasterIP := ""
  165. podIPs := []string{}
  166. for _, pod := range podList.Items {
  167. // pod will be deleted, skip
  168. if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
  169. continue
  170. }
  171. if pod.Status.Phase == corev1.PodPending || pod.Status.Phase == corev1.PodRunning {
  172. // for haproxy if enabled
  173. podIPs = append(podIPs, pod.Status.PodIP)
  174. if newMasterIP == "" {
  175. newMasterIP = pod.Status.PodIP
  176. reqLogger.Info("New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
  177. if err := querySetMaster(newMasterIP); err != nil {
  178. reqLogger.Error(err, "Error! New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
  179. return reconcile.Result{}, err
  180. }
  181. } else {
  182. reqLogger.Info("Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
  183. if err := querySetSlaveOf(pod.Status.PodIP, newMasterIP); err != nil {
  184. reqLogger.Error(err, "Error! Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
  185. return reconcile.Result{}, err
  186. }
  187. }
  188. }
  189. }
  190. // haproxy
  191. // check if you need haproxy
  192. if instance.Spec.UseHAProxy {
  193. configHaproxyShepherdName := instance.Name + "-haproxy-shepherd"
  194. configHaproxyShepherdData := map[string]string{"shepherd.sh": `
  195. #!/bin/sh
  196. echo "Start"
  197. MONFILE='/usr/local/etc/haproxy/haproxy.cfg'
  198. PIDFILE='/run/haproxy.pid'
  199. MD5FILE='/tmp/haproxy.cfg.md5'
  200. touch ${MD5FILE}
  201. while true
  202. do
  203. MD5LAST="$(cat ${MD5FILE})"
  204. echo "Read MD5 of ${MD5FILE}: ${MD5LAST}"
  205. if [ -z "${MD5LAST}" ]
  206. then
  207. echo "First time check, md5 file is empty"
  208. echo "$(md5sum ${MONFILE})" > ${MD5FILE}
  209. else
  210. echo "Get md5 and compare with last time"
  211. MD5CURRENT="$(md5sum ${MONFILE})"
  212. if [ "${MD5CURRENT}" != "${MD5LAST}" ]
  213. then
  214. echo "Send signal to haproxy"
  215. kill -HUP $(cat ${PIDFILE})
  216. echo "${MD5CURRENT}" > ${MD5FILE}
  217. fi
  218. fi
  219. # sleep 5 seconds, it will be enough to not disturb haproxy while pods rapidly creates or dies
  220. sleep 5
  221. done
  222. `}
  223. if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyShepherdName, configHaproxyShepherdData); err != nil {
  224. return reconcile.Result{}, err
  225. }
  226. redisEndpointTemplate := " server redis_backend_%v %v:6379 maxconn 1024 check inter %vs\n"
  227. redisEndpoints := ""
  228. haproxyBackendCheckInterval := 1
  229. if instance.Spec.HAProxyBackendCheckInterval > 0 {
  230. haproxyBackendCheckInterval = instance.Spec.HAProxyBackendCheckInterval
  231. }
  232. for num, ip := range podIPs {
  233. redisEndpoints = redisEndpoints + fmt.Sprintf(redisEndpointTemplate, num, ip, haproxyBackendCheckInterval)
  234. }
  235. configHaproxyConfigName := instance.Name + "-haproxy"
  236. configHaproxyConfigData := map[string]string{"haproxy.cfg": `
  237. global
  238. pidfile /run/haproxy.pid
  239. defaults
  240. mode tcp
  241. timeout connect 5s
  242. timeout server %vs
  243. timeout client %vs
  244. option tcpka
  245. listen stats
  246. mode http
  247. bind :9000
  248. stats enable
  249. stats hide-version
  250. stats realm Haproxy\ Statistics
  251. stats uri /haproxy_stats
  252. frontend ft_redis
  253. mode tcp
  254. bind *:6379
  255. default_backend bk_redis
  256. backend bk_redis
  257. mode tcp
  258. option tcp-check
  259. tcp-check send PING\r\n
  260. tcp-check expect string +PONG
  261. tcp-check send info\ replication\r\n
  262. tcp-check expect string role:master
  263. tcp-check send QUIT\r\n
  264. tcp-check expect string +OK
  265. `}
  266. haproxyTimeoutServer := 30
  267. if instance.Spec.HAProxyTimeoutServer > 0 {
  268. haproxyTimeoutServer = instance.Spec.HAProxyTimeoutServer
  269. }
  270. haproxyTimeoutClient := 30
  271. if instance.Spec.HAProxyTimeoutClient > 0 {
  272. haproxyTimeoutClient = instance.Spec.HAProxyTimeoutClient
  273. }
  274. configHaproxyConfigData["haproxy.cfg"] = fmt.Sprintf(configHaproxyConfigData["haproxy.cfg"], haproxyTimeoutServer, haproxyTimeoutClient) + redisEndpoints
  275. if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyConfigName, configHaproxyConfigData); err != nil {
  276. return reconcile.Result{}, err
  277. }
  278. // reconcile HAProxy deployment
  279. newHAProxyDeployment := manifests.GenerateHaproxyDeployment(instance)
  280. if _, err := r.ReconcileDeployment(reqLogger, instance, newHAProxyDeployment); err != nil {
  281. return reconcile.Result{}, err
  282. }
  283. // create haproxy service
  284. serviceName = instance.Name + "-haproxy"
  285. servicePortName = "haproxy"
  286. servicePort = int32(6379)
  287. serviceSelector = map[string]string{"component": "haproxy"}
  288. if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort, "stats": 9000}, serviceSelector); err != nil {
  289. return reconcile.Result{}, err
  290. }
  291. }
  292. reqLogger.Info("Reconcile complete", "Namespace", instance.Namespace, "Name", instance.Name)
  293. return reconcile.Result{}, nil
  294. }
  295. func querySetMaster(ip string) error {
  296. options := &rediscli.Options{
  297. Addr: fmt.Sprintf("%s:%s", ip, "6379"),
  298. Password: "",
  299. DB: 0,
  300. }
  301. rClient := rediscli.NewClient(options)
  302. defer rClient.Close()
  303. if res := rClient.SlaveOf("NO", "ONE"); res.Err() != nil {
  304. return res.Err()
  305. }
  306. return nil
  307. }
  308. func querySetSlaveOf(ip string, masterIP string) error {
  309. options := &rediscli.Options{
  310. Addr: fmt.Sprintf("%s:%s", ip, "6379"),
  311. Password: "",
  312. DB: 0,
  313. }
  314. rClient := rediscli.NewClient(options)
  315. defer rClient.Close()
  316. if res := rClient.SlaveOf(masterIP, "6379"); res.Err() != nil {
  317. return res.Err()
  318. }
  319. return nil
  320. }