Browse Source

add haproxy support

add haproxy deployment, configmaps and service
refactoring of reconcile
tags/0.0.2
Vladimir Smagin 1 year ago
committed by Gitea
parent
commit
2bd6453a07
9 changed files with 471 additions and 557 deletions
  1. +2
    -0
      deploy/test-instance.yaml
  2. +1
    -0
      go.mod
  3. +6
    -0
      go.sum
  4. +5
    -0
      pkg/apis/blindage/v1alpha1/zz_generated.deepcopy.go
  5. +0
    -310
      pkg/controller/redis/haproxy.go
  6. +146
    -0
      pkg/controller/redis/manifest_haproxy.go
  7. +165
    -0
      pkg/controller/redis/reconcile.go
  8. +5
    -5
      pkg/controller/redis/reconcile_rules.go
  9. +141
    -242
      pkg/controller/redis/redis_controller.go

+ 2
- 0
deploy/test-instance.yaml View File

@@ -3,6 +3,8 @@ kind: Redis
metadata:
name: my-redis
spec:
useHAProxy: true
haproxyReplicas: 1
sentinelReplicas: 3
redisReplicas: 3
quorum: 2

+ 1
- 0
go.mod View File

@@ -4,6 +4,7 @@ require (
github.com/NYTimes/gziphandler v1.0.1 // indirect
github.com/certifi/gocertifi v0.0.0-20190506164543-d2eda7129713 // indirect
github.com/getsentry/raven-go v0.2.0
github.com/go-logr/logr v0.1.0
github.com/go-openapi/spec v0.19.2
github.com/go-redis/redis v0.0.0-20190813142431-c5c4ad6a4cae
github.com/operator-framework/operator-sdk v0.10.1-0.20190820174346-abac23c897b8


+ 6
- 0
go.sum View File

@@ -226,6 +226,7 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
github.com/iancoleman/strcase v0.0.0-20180726023541-3605ed457bf7/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
@@ -291,10 +292,12 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.2-0.20180831124310-ae19f1b56d53/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/openshift/origin v0.0.0-20160503220234-8f127d736703/go.mod h1:0Rox5r9C8aQn6j1oAOQ0c1uC86mYbUFObzjBRvUKHII=
@@ -556,6 +559,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
@@ -564,6 +568,7 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3/go.mod h1:l0
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.3.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
@@ -627,6 +632,7 @@ sigs.k8s.io/controller-runtime v0.1.12/go.mod h1:HFAYoOh6XMV+jKF1UjFwrknPbowfyHE
sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde h1:ZkaHf5rNYzIB6CB82keKMQNv7xxkqT0ylOBdfJPfi+k=
sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde/go.mod h1:ATWLRP3WGxuAN9HcT2LaKHReXIH+EZGzRuMHuxjXfhQ=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
sigs.k8s.io/testing_frameworks v0.1.1 h1:cP2l8fkA3O9vekpy5Ks8mmA0NW/F7yBdXf8brkWhVrs=
sigs.k8s.io/testing_frameworks v0.1.1/go.mod h1:VVBKrHmJ6Ekkfz284YKhQePcdycOzNH9qL6ht1zEr/U=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=


+ 5
- 0
pkg/apis/blindage/v1alpha1/zz_generated.deepcopy.go View File

@@ -90,6 +90,11 @@ func (in *RedisSpec) DeepCopyInto(out *RedisSpec) {
**out = **in
}
out.PersistentVolumeSize = in.PersistentVolumeSize.DeepCopy()
if in.HAProxyReplicas != nil {
in, out := &in.HAProxyReplicas, &out.HAProxyReplicas
*out = new(int32)
**out = **in
}
if in.PodRequests != nil {
in, out := &in.PodRequests, &out.PodRequests
*out = new(v1.ResourceList)


+ 0
- 310
pkg/controller/redis/haproxy.go View File

@@ -1,310 +0,0 @@
package redis

import (
blindagev1alpha1 "git.blindage.org/21h/redis-operator/pkg/apis/blindage/v1alpha1"

"fmt"
"sort"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func generateHaproxyDeployment(cr *blindagev1alpha1.Redis) v1.Deployment {
image := "haproxy:1.9.4-alpine"
if cr.Spec.HAProxyImage != "" {
image = cr.Spec.Image
}

replicas := int32(1)
if cr.Spec.HAProxyReplicas != nil {
replicas = *cr.Spec.HAProxyReplicas
}
shareProcessNamespace := true
return v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: cr.Name + "-haproxy",
Namespace: cr.Namespace,
},
Spec: v1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: mergeLabels(baseLabels(cr), map[string]string{"component": "haproxy"}),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: mergeLabels(baseLabels(cr), map[string]string{"component": "haproxy"}),
},
Spec: corev1.PodSpec{
ShareProcessNamespace: &shareProcessNamespace,
Containers: []corev1.Container{
{
Name: "haproxy",
Image: image,
ImagePullPolicy: "Always",
Ports: []corev1.ContainerPort{
{
Name: "haproxy",
ContainerPort: 6379,
Protocol: corev1.ProtocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "haproxy-config",
MountPath: "/usr/local/etc/haproxy",
},
{
Name: "pid",
MountPath: "/run",
},
},
},
{
Name: "shepherd",
Image: "busybox",
Command: []string{
"sh",
"/shepherd.sh"},
ReadinessProbe: &corev1.Probe{
InitialDelaySeconds: 30,
TimeoutSeconds: 5,
PeriodSeconds: 30,
Handler: corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"cat",
"/usr/local/etc/haproxy/haproxy.cfg",
},
},
},
},
LivenessProbe: &corev1.Probe{
InitialDelaySeconds: 30,
TimeoutSeconds: 5,
PeriodSeconds: 30,
Handler: corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"cat",
"/run/haproxy.pid",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "shepherd",
MountPath: "/shepherd.sh",
SubPath: "shepherd.sh",
},
{
Name: "haproxy-config",
MountPath: "/usr/local/etc/haproxy",
},
{
Name: "pid",
MountPath: "/run",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "pid",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
// reads pid from /run/haproxy.pid and sends signal if haproxy.cfg was changed
// from last check time
Name: "shepherd",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: cr.Name + "-shepherd",
},
},
},
},
{
Name: "haproxy-config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: cr.Name,
},
},
},
},
},
},
},
},
}
}

func GetHAProxyService(redis *v1.Redis) *corev1.Service {
name := GetHAProxyName(redis.Name)
namespace := redis.Namespace

labels := HaproxyLabelSelector(redis)

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: map[string]string{
"prometheus.io/scrape": "true",
"prometheus.io/port": "http",
"prometheus.io/path": "/haproxy_stats",
},
OwnerReferences: []metav1.OwnerReference{
redis.AsOwner(),
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: corev1.ClusterIPNone,
Ports: []corev1.ServicePort{
{
Port: 9000,
Protocol: corev1.ProtocolTCP,
Name: "exporter",
},
{
Port: 6379,
Protocol: corev1.ProtocolTCP,
Name: "haproxy",
},
},
Selector: labels,
},
}
}

func GetHAProxyShepherd(redis *v1.Redis) *corev1.ConfigMap {
name := GetHAProxyName(redis.Name) + "-shepherd"
namespace := redis.Namespace

shepherdFileContent := `#!/bin/sh
echo "Start"
MONFILE='/usr/local/etc/haproxy/haproxy.cfg'
PIDFILE='/run/haproxy.pid'
MD5FILE='/tmp/haproxy.cfg.md5'

touch ${MD5FILE}

while true
do
MD5LAST="$(cat ${MD5FILE})"
echo "Read MD5 of ${MD5FILE}: ${MD5LAST}"
if [ -z "${MD5LAST}" ]
then
echo "First time check, md5 file is empty"
echo "$(md5sum ${MONFILE})" > ${MD5FILE}
else
echo "Get md5 and compare with last time"
MD5CURRENT="$(md5sum ${MONFILE})"
if [ "${MD5CURRENT}" != "${MD5LAST}" ]
then
echo "Send signal to haproxy"
kill -HUP $(cat ${PIDFILE})
echo "${MD5CURRENT}" > ${MD5FILE}
fi
fi
# sleep 5 seconds, it will be enough to not disturb haproxy while pods rapidly creates or dies
sleep 5
done
`
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
redis.AsOwner(),
},
},
Data: map[string]string{
"shepherd.sh": shepherdFileContent,
},
}
}

func GetHAProxyConfigMap(redis *v1.Redis, epList []*corev1.Endpoints) *corev1.ConfigMap {
name := GetHAProxyName(redis.Name)
namespace := redis.Namespace

ips := []string{}
for _, endpoint := range epList {
// if endpoint.Name == GetMasterServiceName(redis.Name) {
// for _, subset := range endpoint.Subsets {
// for _, ip := range subset.Addresses {
// ips = append(ips, ip.IP)
// }
// }
// }
if endpoint.Name == GetSlavesName(redis.Name) {
for _, subset := range endpoint.Subsets {
for _, ip := range subset.Addresses {
ips = append(ips, ip.IP)
}
}
}
}

// sort to make list stable because API returns random every time
sort.Strings(ips)

template := " server redis_backend_%v %v:6379 maxconn 1024 check inter 1s\n"
haproxyRedisEndpoints := ""
for num, ip := range ips {
haproxyRedisEndpoints = haproxyRedisEndpoints + fmt.Sprintf(template, num, ip)
}

sentinelConfigFileContent := `
global
pidfile /run/haproxy.pid
defaults
mode tcp
timeout connect 3s
timeout server 6s
timeout client 6s
listen stats
mode http
bind :9000
stats enable
stats hide-version
stats realm Haproxy\ Statistics
stats uri /haproxy_stats
frontend ft_redis
mode tcp
bind *:6379
default_backend bk_redis
backend bk_redis
mode tcp
option tcp-check
tcp-check send PING\r\n
tcp-check expect string +PONG
tcp-check send info\ replication\r\n
tcp-check expect string role:master
tcp-check send QUIT\r\n
tcp-check expect string +OK
`

sentinelConfigFileContent = sentinelConfigFileContent + haproxyRedisEndpoints
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
redis.AsOwner(),
},
},
Data: map[string]string{
"haproxy.cfg": sentinelConfigFileContent,
},
}
}

+ 146
- 0
pkg/controller/redis/manifest_haproxy.go View File

@@ -0,0 +1,146 @@
package redis

import (
blindagev1alpha1 "git.blindage.org/21h/redis-operator/pkg/apis/blindage/v1alpha1"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func generateHaproxyDeployment(cr *blindagev1alpha1.Redis) v1.Deployment {
image := "haproxy:1.9.4-alpine"
if cr.Spec.HAProxyImage != "" {
image = cr.Spec.Image
}

replicas := int32(1)
if cr.Spec.HAProxyReplicas != nil {
replicas = *cr.Spec.HAProxyReplicas
}

shareProcessNamespace := true

return v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: cr.Name + "-haproxy",
Namespace: cr.Namespace,
},
Spec: v1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: mergeLabels(baseLabels(cr), map[string]string{"component": "haproxy"}),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: mergeLabels(baseLabels(cr), map[string]string{"component": "haproxy"}),
},
Spec: corev1.PodSpec{
ShareProcessNamespace: &shareProcessNamespace,
Containers: []corev1.Container{
{
Name: "haproxy",
Image: image,
ImagePullPolicy: "Always",
Ports: []corev1.ContainerPort{
{
Name: "haproxy",
ContainerPort: 6379,
Protocol: corev1.ProtocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "haproxy-config",
MountPath: "/usr/local/etc/haproxy",
},
{
Name: "pid",
MountPath: "/run",
},
},
},
{
Name: "shepherd",
Image: "busybox",
Command: []string{
"sh",
"/shepherd.sh"},
ReadinessProbe: &corev1.Probe{
InitialDelaySeconds: 30,
TimeoutSeconds: 5,
PeriodSeconds: 30,
Handler: corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"cat",
"/usr/local/etc/haproxy/haproxy.cfg",
},
},
},
},
LivenessProbe: &corev1.Probe{
InitialDelaySeconds: 30,
TimeoutSeconds: 5,
PeriodSeconds: 30,
Handler: corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"cat",
"/run/haproxy.pid",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "shepherd",
MountPath: "/shepherd.sh",
SubPath: "shepherd.sh",
},
{
Name: "haproxy-config",
MountPath: "/usr/local/etc/haproxy",
},
{
Name: "pid",
MountPath: "/run",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "pid",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
// reads pid from /run/haproxy.pid and sends signal if haproxy.cfg was changed
// from last check time
Name: "shepherd",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: cr.Name + "-haproxy-shepherd",
},
},
},
},
{
Name: "haproxy-config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: cr.Name + "-haproxy",
},
},
},
},
},
},
},
},
}
}

+ 165
- 0
pkg/controller/redis/reconcile.go View File

@@ -0,0 +1,165 @@
package redis

import (
"context"

v1 "k8s.io/api/apps/v1"

blindagev1alpha1 "git.blindage.org/21h/redis-operator/pkg/apis/blindage/v1alpha1"
raven "github.com/getsentry/raven-go"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// ReconcileConfigmap means full reconcile of configmap
func (r *ReconcileRedis) ReconcileConfigmap(reqLogger logr.Logger, cr *blindagev1alpha1.Redis, configmapName string, configmapData map[string]string) (reconcile.Result, error) {

newConfigmap := generateConfigmap(cr, configmapName, configmapData)

if err := controllerutil.SetControllerReference(cr, &newConfigmap, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}

foundConfigmap := corev1.ConfigMap{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: newConfigmap.Name, Namespace: newConfigmap.Namespace}, &foundConfigmap)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a Sentinel Configmap", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name)
err = r.client.Create(context.TODO(), &newConfigmap)
if err != nil {
reqLogger.Info("Creating Sentinel Configmap error", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconService := reconcileConfigmapApply(foundConfigmap, newConfigmap); reconcileRequired {
reqLogger.Info("Updating Sentinel Configmap", "Namespace", reconService.Namespace, "Name", reconService.Name)
if err = r.client.Update(context.TODO(), &reconService); err != nil {
reqLogger.Info("Reconcile Sentinel Configmap error", "Namespace", foundConfigmap.Namespace, "Name", foundConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

reqLogger.Info("Reconcile configmap complete", "Configmap Name", configmapName, "Namespace", cr.Namespace, "Name", cr.Name)
return reconcile.Result{}, nil
}

// ReconcileService one function to reconcile all services
func (r *ReconcileRedis) ReconcileService(reqLogger logr.Logger, cr *blindagev1alpha1.Redis, serviceName, portName string, portNum int32, selector map[string]string) (reconcile.Result, error) {

// reconcile Service
newService := generateService(cr, serviceName, portName, portNum, selector)

if err := controllerutil.SetControllerReference(cr, &newService, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}

foundService := corev1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: newService.Name, Namespace: newService.Namespace}, &foundService)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Service", "Namespace", newService.Namespace, "Name", newService.Name)
err = r.client.Create(context.TODO(), &newService)
if err != nil {
reqLogger.Info("Creating Service error", "Namespace", newService.Namespace, "Name", newService.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconService := reconcileServiceApply(foundService, newService); reconcileRequired {
reqLogger.Info("Updating Service", "Namespace", reconService.Namespace, "Name", reconService.Name)
if err = r.client.Update(context.TODO(), &reconService); err != nil {
reqLogger.Info("Reconcile Service error", "Namespace", foundService.Namespace, "Name", foundService.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}
reqLogger.Info("Reconcile service complete", "Service Name", serviceName, "Namespace", cr.Namespace, "Name", cr.Name)
return reconcile.Result{}, nil
}

// ReconcileDeployment one function to reconcile all deployments
func (r *ReconcileRedis) ReconcileDeployment(reqLogger logr.Logger, cr *blindagev1alpha1.Redis, deployment v1.Deployment) (reconcile.Result, error) {

if err := controllerutil.SetControllerReference(cr, &deployment, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}

// controller deployment
foundDeployment := v1.Deployment{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, &foundDeployment)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating Deployment", "Namespace", deployment.Namespace, "Name", deployment.Name)
err = r.client.Create(context.TODO(), &deployment)
if err != nil {
reqLogger.Info("Creating Deployment error", "Namespace", deployment.Namespace, "Name", deployment.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconDeployment := reconcileDeploymentApply(foundDeployment, deployment); reconcileRequired {
reqLogger.Info("Updating Deployment", "Namespace", reconDeployment.Namespace, "Name", reconDeployment.Name)
if err = r.client.Update(context.TODO(), &reconDeployment); err != nil {
reqLogger.Info("Reconcile Deployment error", "Namespace", foundDeployment.Namespace, "Name", foundDeployment.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}
reqLogger.Info("Reconcile Deployment complete", "Deployment Name", deployment, "Namespace", cr.Namespace, "Name", cr.Name)
return reconcile.Result{}, nil
}

// ReconcileStatefulSet one function to reconcile all StatefulSets
func (r *ReconcileRedis) ReconcileStatefulSet(reqLogger logr.Logger, cr *blindagev1alpha1.Redis, newStatefulset v1.StatefulSet) (reconcile.Result, error) {

if err := controllerutil.SetControllerReference(cr, &newStatefulset, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}

// controller statefulset
foundStatefulset := v1.StatefulSet{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: newStatefulset.Name, Namespace: newStatefulset.Namespace}, &foundStatefulset)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Redis Statefulset", "Namespace", newStatefulset.Namespace, "Name", newStatefulset.Name)
err = r.client.Create(context.TODO(), &newStatefulset)
if err != nil {
reqLogger.Info("Creating Redis Statefulset error", "Namespace", newStatefulset.Namespace, "Name", newStatefulset.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconDeployment := reconcileStatefulsetApply(foundStatefulset, newStatefulset); reconcileRequired {
reqLogger.Info("Updating Redis Statefulset", "Namespace", reconDeployment.Namespace, "Name", reconDeployment.Name)
if err = r.client.Update(context.TODO(), &reconDeployment); err != nil {
reqLogger.Info("Reconcile Redis Statefulset error", "Namespace", foundStatefulset.Namespace, "Name", foundStatefulset.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

reqLogger.Info("Reconcile StatefulSet complete", "StatefulSet Name", newStatefulset, "Namespace", cr.Namespace, "Name", cr.Name)
return reconcile.Result{}, nil
}

+ 5
- 5
pkg/controller/redis/reconcile_rules.go View File

@@ -9,7 +9,7 @@ import (

// check if reconcile required

func reconcilePersistentVolumeClaim(foundVolume corev1.PersistentVolumeClaim, newVolume corev1.PersistentVolumeClaim) (bool, corev1.PersistentVolumeClaim) {
func reconcilePersistentVolumeClaimApply(foundVolume corev1.PersistentVolumeClaim, newVolume corev1.PersistentVolumeClaim) (bool, corev1.PersistentVolumeClaim) {

reconcileRequired := false

@@ -22,7 +22,7 @@ func reconcilePersistentVolumeClaim(foundVolume corev1.PersistentVolumeClaim, ne

}

func reconcileConfigmap(foundConfigmap corev1.ConfigMap, newConfigmap corev1.ConfigMap) (bool, corev1.ConfigMap) {
func reconcileConfigmapApply(foundConfigmap corev1.ConfigMap, newConfigmap corev1.ConfigMap) (bool, corev1.ConfigMap) {

reconcileRequired := false

@@ -34,7 +34,7 @@ func reconcileConfigmap(foundConfigmap corev1.ConfigMap, newConfigmap corev1.Con
return reconcileRequired, foundConfigmap
}

func reconcileStatefulset(foundStatefulSet v1.StatefulSet, newStatefulSet v1.StatefulSet) (bool, v1.StatefulSet) {
func reconcileStatefulsetApply(foundStatefulSet v1.StatefulSet, newStatefulSet v1.StatefulSet) (bool, v1.StatefulSet) {

reconcileRequired := false

@@ -62,7 +62,7 @@ func reconcileStatefulset(foundStatefulSet v1.StatefulSet, newStatefulSet v1.Sta

}

func reconcileDeployment(foundDeployment v1.Deployment, newDeployment v1.Deployment) (bool, v1.Deployment) {
func reconcileDeploymentApply(foundDeployment v1.Deployment, newDeployment v1.Deployment) (bool, v1.Deployment) {

reconcileRequired := false

@@ -90,7 +90,7 @@ func reconcileDeployment(foundDeployment v1.Deployment, newDeployment v1.Deploym

}

func reconcileService(foundService corev1.Service, newService corev1.Service) (bool, corev1.Service) {
func reconcileServiceApply(foundService corev1.Service, newService corev1.Service) (bool, corev1.Service) {

reconcileRequired := false



+ 141
- 242
pkg/controller/redis/redis_controller.go View File

@@ -15,10 +15,8 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -88,6 +86,7 @@ type ReconcileRedis struct {
scheme *runtime.Scheme
}

// Reconcile means magic begins
func (r *ReconcileRedis) Reconcile(request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling Redis")
@@ -113,37 +112,10 @@ sentinel parallel-syncs redismaster 2
`
configSentinelData := map[string]string{"sentinel.conf": fmt.Sprintf(configSentinelTemplate, instance.Name+"-sentinel", instance.Spec.Quorum)}

newConfigmap := generateConfigmap(instance, configSentinelName, configSentinelData)

if err := controllerutil.SetControllerReference(instance, &newConfigmap, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
if _, err := r.ReconcileConfigmap(reqLogger, instance, configSentinelName, configSentinelData); err != nil {
return reconcile.Result{}, err
}

foundConfigmap := corev1.ConfigMap{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: newConfigmap.Name, Namespace: newConfigmap.Namespace}, &foundConfigmap)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a Sentinel Configmap", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name)
err = r.client.Create(context.TODO(), &newConfigmap)
if err != nil {
reqLogger.Info("Creating Sentinel Configmap error", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconService := reconcileConfigmap(foundConfigmap, newConfigmap); reconcileRequired {
reqLogger.Info("Updating Sentinel Configmap", "Namespace", reconService.Namespace, "Name", reconService.Name)
if err = r.client.Update(context.TODO(), &reconService); err != nil {
reqLogger.Info("Reconcile Sentinel Configmap error", "Namespace", foundConfigmap.Namespace, "Name", foundConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

configRedisName := instance.Name + "-redis"
configRedisData := map[string]string{"redis.conf": `
slaveof 127.0.0.1 6379
@@ -152,37 +124,10 @@ save 900 1
save 300 10
`}

newConfigmap = generateConfigmap(instance, configRedisName, configRedisData)

if err := controllerutil.SetControllerReference(instance, &newConfigmap, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
if _, err := r.ReconcileConfigmap(reqLogger, instance, configRedisName, configRedisData); err != nil {
return reconcile.Result{}, err
}

foundConfigmap = corev1.ConfigMap{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: newConfigmap.Name, Namespace: newConfigmap.Namespace}, &foundConfigmap)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a Redis Configmap", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name)
err = r.client.Create(context.TODO(), &newConfigmap)
if err != nil {
reqLogger.Info("Creating Redis Configmap error", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconService := reconcileConfigmap(foundConfigmap, newConfigmap); reconcileRequired {
reqLogger.Info("Updating Redis Configmap", "Namespace", reconService.Namespace, "Name", reconService.Name)
if err = r.client.Update(context.TODO(), &reconService); err != nil {
reqLogger.Info("Reconcile Redis Configmap error", "Namespace", foundConfigmap.Namespace, "Name", foundConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

configFailoverName := instance.Name + "-failover"
configFailoverData := map[string]string{"failover.conf": `
MASTER_HOST=$(redis-cli -h ${SENTINEL_SERVICE} -p 26379 --csv SENTINEL get-master-addr-by-name redismaster | tr ',' ' ' | tr -d '\"' |cut -d' ' -f1)
@@ -191,173 +136,45 @@ if [[ ${MASTER_HOST} == $(hostname -i) ]]; then
fi
`}

newConfigmap = generateConfigmap(instance, configFailoverName, configFailoverData)

if err := controllerutil.SetControllerReference(instance, &newConfigmap, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}

foundConfigmap = corev1.ConfigMap{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: newConfigmap.Name, Namespace: newConfigmap.Namespace}, &foundConfigmap)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a Failover Configmap", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name)
err = r.client.Create(context.TODO(), &newConfigmap)
if err != nil {
reqLogger.Info("Creating Failover Configmap error", "Namespace", newConfigmap.Namespace, "Name", newConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
if _, err := r.ReconcileConfigmap(reqLogger, instance, configFailoverName, configFailoverData); err != nil {
return reconcile.Result{}, err
} else {
if reconcileRequired, reconService := reconcileConfigmap(foundConfigmap, newConfigmap); reconcileRequired {
reqLogger.Info("Updating Failover Configmap", "Namespace", reconService.Namespace, "Name", reconService.Name)
if err = r.client.Update(context.TODO(), &reconService); err != nil {
reqLogger.Info("Reconcile Failover Configmap error", "Namespace", foundConfigmap.Namespace, "Name", foundConfigmap.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

// reconcile deployment
// reconcile Sentinel deployment
newSentinelDeployment := generateDeployment(instance)

if err := controllerutil.SetControllerReference(instance, &newSentinelDeployment, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
if _, err := r.ReconcileDeployment(reqLogger, instance, newSentinelDeployment); err != nil {
return reconcile.Result{}, err
}

// controller deployment
foundSentinelDeployment := v1.Deployment{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: newSentinelDeployment.Name, Namespace: newSentinelDeployment.Namespace}, &foundSentinelDeployment)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating Sentinel Deployment", "Namespace", newSentinelDeployment.Namespace, "Name", newSentinelDeployment.Name)
err = r.client.Create(context.TODO(), &newSentinelDeployment)
if err != nil {
reqLogger.Info("Creating Sentinel Deployment error", "Namespace", newSentinelDeployment.Namespace, "Name", newSentinelDeployment.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconDeployment := reconcileDeployment(foundSentinelDeployment, newSentinelDeployment); reconcileRequired {
reqLogger.Info("Updating Sentinel Deployment", "Namespace", reconDeployment.Namespace, "Name", reconDeployment.Name)
if err = r.client.Update(context.TODO(), &reconDeployment); err != nil {
reqLogger.Info("Reconcile Sentinel Deployment error", "Namespace", foundSentinelDeployment.Namespace, "Name", foundSentinelDeployment.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

// reconcile Redis StatefulSet
newRedisStatefulset := generateStatefulSet(instance)

if err := controllerutil.SetControllerReference(instance, &newRedisStatefulset, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}

// controller statefulset
foundRedisStatefulset := v1.StatefulSet{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: newRedisStatefulset.Name, Namespace: newRedisStatefulset.Namespace}, &foundRedisStatefulset)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Redis Statefulset", "Namespace", newRedisStatefulset.Namespace, "Name", newRedisStatefulset.Name)
err = r.client.Create(context.TODO(), &newRedisStatefulset)
if err != nil {
reqLogger.Info("Creating Redis Statefulset error", "Namespace", newRedisStatefulset.Namespace, "Name", newRedisStatefulset.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconDeployment := reconcileStatefulset(foundRedisStatefulset, newRedisStatefulset); reconcileRequired {
reqLogger.Info("Updating Redis Statefulset", "Namespace", reconDeployment.Namespace, "Name", reconDeployment.Name)
if err = r.client.Update(context.TODO(), &reconDeployment); err != nil {
reqLogger.Info("Reconcile Redis Statefulset error", "Namespace", foundRedisStatefulset.Namespace, "Name", foundRedisStatefulset.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

// reconcile Sentinel Service
newSentinelService := generateService(instance, instance.Name+"-sentinel", "sentinel", int32(26379), map[string]string{"component": "sentinel"})

if err := controllerutil.SetControllerReference(instance, &newSentinelService, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
if _, err := r.ReconcileStatefulSet(reqLogger, instance, newRedisStatefulset); err != nil {
return reconcile.Result{}, err
}

foundSentinelService := corev1.Service{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: newSentinelService.Name, Namespace: newSentinelService.Namespace}, &foundSentinelService)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Sentinel Service", "Namespace", newSentinelService.Namespace, "Name", newSentinelService.Name)
err = r.client.Create(context.TODO(), &newSentinelService)
if err != nil {
reqLogger.Info("Creating Sentinel Service error", "Namespace", newSentinelService.Namespace, "Name", newSentinelService.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
// create sentinel and redis services
serviceName := instance.Name + "-sentinel"
servicePortName := "sentinel"
servicePort := int32(26379)
serviceSelector := map[string]string{"component": "sentinel"}
if _, err := r.ReconcileService(reqLogger, instance, serviceName, servicePortName, servicePort, serviceSelector); err != nil {
return reconcile.Result{}, err
} else {
if reconcileRequired, reconService := reconcileService(foundSentinelService, newSentinelService); reconcileRequired {
reqLogger.Info("Updating Sentinel Service", "Namespace", reconService.Namespace, "Name", reconService.Name)
if err = r.client.Update(context.TODO(), &reconService); err != nil {
reqLogger.Info("Reconcile Sentinel Service error", "Namespace", foundSentinelService.Namespace, "Name", foundSentinelService.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

// reconcile Redis Service
newRedisService := generateService(instance, instance.Name+"-redis", "redis", int32(6379), map[string]string{"component": "redis"})
if err := controllerutil.SetControllerReference(instance, &newRedisService, r.scheme); err != nil {
raven.CaptureErrorAndWait(err, nil)
serviceName = instance.Name + "-redis"
servicePortName = "redis"
servicePort = int32(6379)
serviceSelector = map[string]string{"component": "redis"}
if _, err := r.ReconcileService(reqLogger, instance, serviceName, servicePortName, servicePort, serviceSelector); err != nil {
return reconcile.Result{}, err
}

foundRedisService := corev1.Service{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: newRedisService.Name, Namespace: newRedisService.Namespace}, &foundRedisService)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Creating a new Redis Service", "Namespace", newRedisService.Namespace, "Name", newRedisService.Name)
err = r.client.Create(context.TODO(), &newRedisService)
if err != nil {
reqLogger.Info("Creating Redis Service error", "Namespace", newRedisService.Namespace, "Name", newRedisService.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
} else if err != nil {
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
} else {
if reconcileRequired, reconService := reconcileService(foundRedisService, newRedisService); reconcileRequired {
reqLogger.Info("Updating Redis Service", "Namespace", reconService.Namespace, "Name", reconService.Name)
if err = r.client.Update(context.TODO(), &reconService); err != nil {
reqLogger.Info("Reconcile Redis Service error", "Namespace", foundRedisService.Namespace, "Name", foundRedisService.Name, "Error", err)
raven.CaptureErrorAndWait(err, nil)
return reconcile.Result{}, err
}
}
}

// check health of pods
// set Redis master

podList := &corev1.PodList{}
labelSelector := labels.SelectorFromSet(foundRedisStatefulset.Labels)
labelSelector := labels.SelectorFromSet(newRedisStatefulset.Labels)
listOpts := &client.ListOptions{
Namespace: foundRedisStatefulset.Namespace,
Namespace: newRedisStatefulset.Namespace,
LabelSelector: labelSelector,
}
err = r.client.List(context.TODO(), listOpts, podList)
@@ -377,55 +194,137 @@ fi
})

newMasterIP := ""
podIPs := []string{}

for _, pod := range podList.Items {
if newMasterIP == "" {
newMasterIP = pod.Status.PodIP
reqLogger.Info("New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
if err := querySetMaster(newMasterIP); err != nil {
reqLogger.Error(err, "Error! New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
return reconcile.Result{}, err
}
} else {
reqLogger.Info("Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
if err := querySetSlaveOf(pod.Status.PodIP, newMasterIP); err != nil {
reqLogger.Error(err, "Error! Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
return reconcile.Result{}, err
// pod will be deleted, skip
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
continue
}

if pod.Status.Phase == corev1.PodPending || pod.Status.Phase == corev1.PodRunning {
// for haproxy if enabled
podIPs = append(podIPs, pod.Status.PodIP)

if newMasterIP == "" {
newMasterIP = pod.Status.PodIP
reqLogger.Info("New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
if err := querySetMaster(newMasterIP); err != nil {
reqLogger.Error(err, "Error! New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
return reconcile.Result{}, err
}
} else {
reqLogger.Info("Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
if err := querySetSlaveOf(pod.Status.PodIP, newMasterIP); err != nil {
reqLogger.Error(err, "Error! Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
return reconcile.Result{}, err
}
}
}
}

// haproxy

// check if you need haproxy
if instance.Spec.UseHAProxy {
// check if you need haproxy
if instance.Spec.UseHAProxy {

configHaproxyShepherdName := instance.Name + "-haproxy-shepherd"
configHaproxyShepherdData := map[string]string{"shepherd.sh": `
#!/bin/sh
echo "Start"
MONFILE='/usr/local/etc/haproxy/haproxy.cfg'
PIDFILE='/run/haproxy.pid'
MD5FILE='/tmp/haproxy.cfg.md5'

touch ${MD5FILE}

while true
do
MD5LAST="$(cat ${MD5FILE})"
echo "Read MD5 of ${MD5FILE}: ${MD5LAST}"
if [ -z "${MD5LAST}" ]
then
echo "First time check, md5 file is empty"
echo "$(md5sum ${MONFILE})" > ${MD5FILE}
else
echo "Get md5 and compare with last time"
MD5CURRENT="$(md5sum ${MONFILE})"
if [ "${MD5CURRENT}" != "${MD5LAST}" ]
then
echo "Send signal to haproxy"
kill -HUP $(cat ${PIDFILE})
echo "${MD5CURRENT}" > ${MD5FILE}
fi
fi
# sleep 5 seconds, it will be enough to not disturb haproxy while pods rapidly creates or dies
sleep 5
done
`}

for _, pod := range podList.Items {
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
continue
if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyShepherdName, configHaproxyShepherdData); err != nil {
return reconcile.Result{}, err
}
if pod.Status.Phase == corev1.PodPending || pod.Status.Phase == corev1.PodRunning {
// ok, got pod name, check status

redisEndpointTemplate := " server redis_backend_%v %v:6379 maxconn 1024 check inter 1s\n"
redisEndpoints := ""

for num, ip := range podIPs {
redisEndpoints = redisEndpoints + fmt.Sprintf(redisEndpointTemplate, num, ip)
}
}

// deploy haproxy and shepherd
if err := c.haproxyDeployment(redis.Redis); err != nil {
c.logger.Println("haproxyDeployment error", err)
return err
}

// create haproxy endpoint
if err := c.haproxyServiceProcess(redis.Redis); err != nil {
c.logger.Println("haproxyServiceProcess error", err)
return err
}

// create endpoints of redis slaves
if err := c.redisSlavesServiceProcess(redis.Redis); err != nil {
c.logger.Println("redisSlavesServiceProcess error", err)
return err
}
}
configHaproxyConfigName := instance.Name + "-haproxy"
configHaproxyConfigData := map[string]string{"haproxy.cfg": `
global
pidfile /run/haproxy.pid
defaults
mode tcp
timeout connect 3s
timeout server 6s
timeout client 6s
listen stats
mode http
bind :9000
stats enable
stats hide-version
stats realm Haproxy\ Statistics
stats uri /haproxy_stats
frontend ft_redis
mode tcp
bind *:6379
default_backend bk_redis
backend bk_redis
mode tcp
option tcp-check
tcp-check send PING\r\n
tcp-check expect string +PONG
tcp-check send info\ replication\r\n
tcp-check expect string role:master
tcp-check send QUIT\r\n
tcp-check expect string +OK
`}

configHaproxyConfigData["haproxy.cfg"] = configHaproxyConfigData["haproxy.cfg"] + redisEndpoints

if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyConfigName, configHaproxyConfigData); err != nil {
return reconcile.Result{}, err
}

// reconcile HAProxy deployment
newHAProxyDeployment := generateHaproxyDeployment(instance)
if _, err := r.ReconcileDeployment(reqLogger, instance, newHAProxyDeployment); err != nil {
return reconcile.Result{}, err
}

// create haproxy service
serviceName = instance.Name + "-haproxy"
servicePortName = "haproxy"
servicePort = int32(6379)
serviceSelector = map[string]string{"component": "haproxy"}
if _, err := r.ReconcileService(reqLogger, instance, serviceName, servicePortName, servicePort, serviceSelector); err != nil {
return reconcile.Result{}, err
}

}

reqLogger.Info("Reconcile complete", "Namespace", instance.Namespace, "Name", instance.Name)
return reconcile.Result{}, nil


Loading…
Cancel
Save