confd源码分析

confd的源码参考:https://github.com/kelseyhightower/confd

本文分析的confd的版本是v0.16.0,代码参考:https://github.com/kelseyhightower/confd/tree/v0.16.0。

1. Main

confd的入口函数 Main 函数,先解析参数,如果是打印版本信息的参数,则执行打印版本的命令。

func main() {
	flag.Parse()
	if config.PrintVersion {
		fmt.Printf("confd %s (Git SHA: %s, Go Version: %s)\n", Version, GitSHA, runtime.Version())
		os.Exit(0)
	}
    ...
}    

其中版本信息记录在https://github.com/kelseyhightower/confd/blob/v0.16.0/version.go#L3

const Version = "0.16.0"

1.1. initConfig

初始化配置文件。

if err := initConfig(); err != nil {
	log.Fatal(err.Error())
}

initConfig函数对基本的配置内容做初始化,当没有指定后端存储的时候,设置默认存储。

// initConfig initializes the confd configuration by first setting defaults,
// then overriding settings from the confd config file, then overriding
// settings from environment variables, and finally overriding
// settings from flags set on the command line.
// It returns an error if any.
func initConfig() error {
	_, err := os.Stat(config.ConfigFile)
	if os.IsNotExist(err) {
		log.Debug("Skipping confd config file.")
	} else {
		log.Debug("Loading " + config.ConfigFile)
		configBytes, err := ioutil.ReadFile(config.ConfigFile)
		if err != nil {
			return err
		}

		_, err = toml.Decode(string(configBytes), &config)
		if err != nil {
			return err
		}
	}

	// Update config from environment variables.
	processEnv()

	if config.SecretKeyring != "" {
		kr, err := os.Open(config.SecretKeyring)
		if err != nil {
			log.Fatal(err.Error())
		}
		defer kr.Close()
		config.PGPPrivateKey, err = ioutil.ReadAll(kr)
		if err != nil {
			log.Fatal(err.Error())
		}
	}

	if config.LogLevel != "" {
		log.SetLevel(config.LogLevel)
	}

	if config.SRVDomain != "" && config.SRVRecord == "" {
		config.SRVRecord = fmt.Sprintf("_%s._tcp.%s.", config.Backend, config.SRVDomain)
	}

	// Update BackendNodes from SRV records.
	if config.Backend != "env" && config.SRVRecord != "" {
		log.Info("SRV record set to " + config.SRVRecord)
		srvNodes, err := getBackendNodesFromSRV(config.SRVRecord)
		if err != nil {
			return errors.New("Cannot get nodes from SRV records " + err.Error())
		}

		switch config.Backend {
		case "etcd":
			vsm := make([]string, len(srvNodes))
			for i, v := range srvNodes {
				vsm[i] = config.Scheme + "://" + v
			}
			srvNodes = vsm
		}

		config.BackendNodes = srvNodes
	}
	if len(config.BackendNodes) == 0 {
		switch config.Backend {
		case "consul":
			config.BackendNodes = []string{"127.0.0.1:8500"}
		case "etcd":
			peerstr := os.Getenv("ETCDCTL_PEERS")
			if len(peerstr) > 0 {
				config.BackendNodes = strings.Split(peerstr, ",")
			} else {
				config.BackendNodes = []string{"http://127.0.0.1:4001"}
			}
		case "etcdv3":
			config.BackendNodes = []string{"127.0.0.1:2379"}
		case "redis":
			config.BackendNodes = []string{"127.0.0.1:6379"}
		case "vault":
			config.BackendNodes = []string{"http://127.0.0.1:8200"}
		case "zookeeper":
			config.BackendNodes = []string{"127.0.0.1:2181"}
		}
	}
	// Initialize the storage client
	log.Info("Backend set to " + config.Backend)

	if config.Watch {
		unsupportedBackends := map[string]bool{
			"dynamodb": true,
			"ssm":      true,
		}

		if unsupportedBackends[config.Backend] {
			log.Info(fmt.Sprintf("Watch is not supported for backend %s. Exiting...", config.Backend))
			os.Exit(1)
		}
	}

	if config.Backend == "dynamodb" && config.Table == "" {
		return errors.New("No DynamoDB table configured")
	}
	config.ConfigDir = filepath.Join(config.ConfDir, "conf.d")
	config.TemplateDir = filepath.Join(config.ConfDir, "templates")
	return nil
}

1.2. storeClient

log.Info("Starting confd")

storeClient, err := backends.New(config.BackendsConfig)
if err != nil {
	log.Fatal(err.Error())
}

根据配置文件中的存储后端类型构造一个存储后端的client,其中主要调用的函数为backends.New(config.BackendsConfig)

当没有设置存储后端时,默认为etcd

if config.Backend == "" {
	config.Backend = "etcd"
}
backendNodes := config.BackendNodes

当存储后端为file类型的处理。

if config.Backend == "file" {
	log.Info("Backend source(s) set to " + strings.Join(config.YAMLFile, ", "))
} else {
	log.Info("Backend source(s) set to " + strings.Join(backendNodes, ", "))
}

最后再根据不同类型的存储后端,调用不同的存储后端构建函数,本文只分析redis类型的存储后端。

switch config.Backend {
case "consul":
	return consul.New(config.BackendNodes, config.Scheme,
		config.ClientCert, config.ClientKey,
		config.ClientCaKeys,
		config.BasicAuth,
		config.Username,
		config.Password,
	)
case "etcd":
	// Create the etcd client upfront and use it for the life of the process.
	// The etcdClient is an http.Client and designed to be reused.
	return etcd.NewEtcdClient(backendNodes, config.ClientCert, config.ClientKey, config.ClientCaKeys, config.BasicAuth, config.Username, config.Password)
case "etcdv3":
	return etcdv3.NewEtcdClient(backendNodes, config.ClientCert, config.ClientKey, config.ClientCaKeys, config.BasicAuth, config.Username, config.Password)
case "zookeeper":
	return zookeeper.NewZookeeperClient(backendNodes)
case "rancher":
	return rancher.NewRancherClient(backendNodes)
case "redis":
	return redis.NewRedisClient(backendNodes, config.ClientKey, config.Separator)
case "env":
	return env.NewEnvClient()
case "file":
	return file.NewFileClient(config.YAMLFile, config.Filter)
case "vault":
	vaultConfig := map[string]string{
		"app-id":    config.AppID,
		"user-id":   config.UserID,
		"role-id":   config.RoleID,
		"secret-id": config.SecretID,
		"username":  config.Username,
		"password":  config.Password,
		"token":     config.AuthToken,
		"cert":      config.ClientCert,
		"key":       config.ClientKey,
		"caCert":    config.ClientCaKeys,
		"path":      config.Path,
	}
	return vault.New(backendNodes[0], config.AuthType, vaultConfig)
case "dynamodb":
	table := config.Table
	log.Info("DynamoDB table set to " + table)
	return dynamodb.NewDynamoDBClient(table)
case "ssm":
	return ssm.New()
}
return nil, errors.New("Invalid backend")

其中redis类型的存储后端调用了NewRedisClient方法来构造redis的client。

case "redis":
	return redis.NewRedisClient(backendNodes, config.ClientKey, config.Separator)

其中涉及三个参数:

  • backendNodes:redis的节点地址。
  • ClientKey:redis的密码。
  • Separator:查找redis键的分隔符,该参数只用在redis类型。

NewRedisClient函数方法如下:

// NewRedisClient returns an *redis.Client with a connection to named machines.
// It returns an error if a connection to the cluster cannot be made.
func NewRedisClient(machines []string, password string, separator string) (*Client, error) {
	if separator == "" {
		separator = "/"
	}
	log.Debug(fmt.Sprintf("Redis Separator: %#v", separator))
	var err error
	clientWrapper := &Client{machines: machines, password: password, separator: separator, client: nil, pscChan: make(chan watchResponse), psc: redis.PubSubConn{Conn: nil} }
	clientWrapper.client, _, err = tryConnect(machines, password, true)
	return clientWrapper, err
}

1.3. processor

stopChan := make(chan bool)
doneChan := make(chan bool)
errChan := make(chan error, 10)

var processor template.Processor
switch {
case config.Watch:
	processor = template.WatchProcessor(config.TemplateConfig, stopChan, doneChan, errChan)
default:
	processor = template.IntervalProcessor(config.TemplateConfig, stopChan, doneChan, errChan, config.Interval)
}

go processor.Process()

当开启watch参数的时候,则构造WatchProcessor,否则构造IntervalProcessor,最后起一个goroutine。

go processor.Process()

这块的逻辑在本文第二部分析。

1.4. signalChan

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
for {
	select {
	case err := <-errChan:
		log.Error(err.Error())
	case s := <-signalChan:
		log.Info(fmt.Sprintf("Captured %v. Exiting...", s))
		close(doneChan)
	case <-doneChan:
		os.Exit(0)
	}
}

2. Process

type Processor interface {
	Process()
}

Processor是一个接口类型,主要的实现体有:

  • intervalProcessor:默认的实现体,即没有添加watch参数。
  • watchProcessor:添加watch参数的实现体。

2.1. intervalProcessor

type intervalProcessor struct {
	config   Config
	stopChan chan bool
	doneChan chan bool
	errChan  chan error
	interval int
}

intervalProcessor根据config内容和几个channel构造一个intervalProcessor。

func IntervalProcessor(config Config, stopChan, doneChan chan bool, errChan chan error, interval int) Processor {
	return &intervalProcessor{config, stopChan, doneChan, errChan, interval}
}

2.1.1. intervalProcessor.Process

func (p *intervalProcessor) Process() {
	defer close(p.doneChan)
	for {
		ts, err := getTemplateResources(p.config)
		if err != nil {
			log.Fatal(err.Error())
			break
		}
		process(ts)
		select {
		case <-p.stopChan:
			break
		case <-time.After(time.Duration(p.interval) * time.Second):
			continue
		}
	}
}

通过解析config内容获取TemplateResources,其中核心函数为process(ts),然后执行t.process(),该函数中会调用t.sync()t.process()的具体逻辑后文分析。

func process(ts []*TemplateResource) error {
	var lastErr error
	for _, t := range ts {
		if err := t.process(); err != nil {
			log.Error(err.Error())
			lastErr = err
		}
	}
	return lastErr
}

2.2. watchProcessor

type watchProcessor struct {
	config   Config
	stopChan chan bool
	doneChan chan bool
	errChan  chan error
	wg       sync.WaitGroup
}

watchProcessor根据config内容和几个channel构造一个watchProcessor。

func WatchProcessor(config Config, stopChan, doneChan chan bool, errChan chan error) Processor {
	var wg sync.WaitGroup
	return &watchProcessor{config, stopChan, doneChan, errChan, wg}
}

2.2.1. watchProcessor.Process

func (p *watchProcessor) Process() {
	defer close(p.doneChan)
	ts, err := getTemplateResources(p.config)
	if err != nil {
		log.Fatal(err.Error())
		return
	}
	for _, t := range ts {
		t := t
		p.wg.Add(1)
		go p.monitorPrefix(t)
	}
	p.wg.Wait()
}

watchProcessor.Process方法实现了Processor接口中定义的方法,通过解析config内容获取TemplateResources,再遍历TemplateResources执行monitorPrefix,有多少个TemplateResources就运行多少个monitorPrefix的goroutine。

2.2.2. monitorPrefix

func (p *watchProcessor) monitorPrefix(t *TemplateResource) {
	defer p.wg.Done()
	keys := util.AppendPrefix(t.Prefix, t.Keys)
	for {
		index, err := t.storeClient.WatchPrefix(t.Prefix, keys, t.lastIndex, p.stopChan)
		if err != nil {
			p.errChan <- err
			// Prevent backend errors from consuming all resources.
			time.Sleep(time.Second * 2)
			continue
		}
		t.lastIndex = index
		if err := t.process(); err != nil {
			p.errChan <- err
		}
	}
}

先对配置文件中的prefixkeys参数进行拼接。

keys := util.AppendPrefix(t.Prefix, t.Keys)

AppendPrefix函数如下:

func AppendPrefix(prefix string, keys []string) []string {
	s := make([]string, len(keys))
	for i, k := range keys {
		s[i] = path.Join(prefix, k)
	}
	return s
}

接着再执行storeClientWatchPrefix方法,因为storeClient是一个接口,对应不同类型的存储后端,WatchPrefix的实现逻辑也不同,本文分析的存储类型为redis

index, err := t.storeClient.WatchPrefix(t.Prefix, keys, t.lastIndex, p.stopChan)
if err != nil {
	p.errChan <- err
	// Prevent backend errors from consuming all resources.
	time.Sleep(time.Second * 2)
	continue
}

storeClient.WatchPrefix主要是获取lastIndex的值,这个值在t.process()中使用。

t.lastIndex = index
if err := t.process(); err != nil {
	p.errChan <- err
}

2.3. TemplateResource.process

无论是否加watch参数,即intervalProcessorwatchProcessor最终都会调用到TemplateResource.process这个函数,而这个函数中的核心函数为t.sync()

// process is a convenience function that wraps calls to the three main tasks
// required to keep local configuration files in sync. First we gather vars
// from the store, then we stage a candidate configuration file, and finally sync
// things up.
// It returns an error if any.
func (t *TemplateResource) process() error {
	if err := t.setFileMode(); err != nil {
		return err
	}
	if err := t.setVars(); err != nil {
		return err
	}
	if err := t.createStageFile(); err != nil {
		return err
	}
	if err := t.sync(); err != nil {
		return err
	}
	return nil
}

2.3.1. setFileMode

setFileMode设置文件的权限,如果没有在配置文件指定mode参数则默认为0644,否则根据配置文件中指定的mode来设置文件权限。

// setFileMode sets the FileMode.
func (t *TemplateResource) setFileMode() error {
	if t.Mode == "" {
		if !util.IsFileExist(t.Dest) {
			t.FileMode = 0644
		} else {
			fi, err := os.Stat(t.Dest)
			if err != nil {
				return err
			}
			t.FileMode = fi.Mode()
		}
	} else {
		mode, err := strconv.ParseUint(t.Mode, 0, 32)
		if err != nil {
			return err
		}
		t.FileMode = os.FileMode(mode)
	}
	return nil
}

2.3.2. setVars

setVars将后端存储中最新的值拿出来暂存到内存中供后续进程使用。其中根据不同的后端,storeClient.GetValues的逻辑可能不同,但通过接口的方式可以让不同的存储后端实现不同的获取值的方法。

// setVars sets the Vars for template resource.
func (t *TemplateResource) setVars() error {
	var err error
	log.Debug("Retrieving keys from store")
	log.Debug("Key prefix set to " + t.Prefix)

	result, err := t.storeClient.GetValues(util.AppendPrefix(t.Prefix, t.Keys))
	if err != nil {
		return err
	}
	log.Debug("Got the following map from store: %v", result)

	t.store.Purge()

	for k, v := range result {
		t.store.Set(path.Join("/", strings.TrimPrefix(k, t.Prefix)), v)
	}
	return nil
}

2.3.3. createStageFile

createStageFile通过srctemplate文件和最新内存中的变量数据生成StageFile,该文件在sync中和目标文件进行比较,看是否有修改。即StageFile实际上是根据后端存储生成的最新的配置文件,如果这份配置文件跟当前的配置文件不同,表明后端存储的数据被更新了需要重新生成一份新的配置文件。

// createStageFile stages the src configuration file by processing the src
// template and setting the desired owner, group, and mode. It also sets the
// StageFile for the template resource.
// It returns an error if any.
func (t *TemplateResource) createStageFile() error {
	log.Debug("Using source template " + t.Src)

	if !util.IsFileExist(t.Src) {
		return errors.New("Missing template: " + t.Src)
	}

	log.Debug("Compiling source template " + t.Src)

	tmpl, err := template.New(filepath.Base(t.Src)).Funcs(t.funcMap).ParseFiles(t.Src)
	if err != nil {
		return fmt.Errorf("Unable to process template %s, %s", t.Src, err)
	}

	// create TempFile in Dest directory to avoid cross-filesystem issues
	temp, err := ioutil.TempFile(filepath.Dir(t.Dest), "."+filepath.Base(t.Dest))
	if err != nil {
		return err
	}

	if err = tmpl.Execute(temp, nil); err != nil {
		temp.Close()
		os.Remove(temp.Name())
		return err
	}
	defer temp.Close()

	// Set the owner, group, and mode on the stage file now to make it easier to
	// compare against the destination configuration file later.
	os.Chmod(temp.Name(), t.FileMode)
	os.Chown(temp.Name(), t.Uid, t.Gid)
	t.StageFile = temp
	return nil
}

2.3.4. sync

if err := t.sync(); err != nil {
	return err
}

t.sync()是执行confd核心功能的函数,将配置文件通过模板的方式自动生成,并执行检查命令和reload命令。该部分逻辑在本文第三部分分析。

3. sync

sync通过比较源文件和目标文件的差别,如果不同则重新生成新的配置,当设置了check_cmdreload_cmd的时候,会执行check_cmd指定的检查命令,如果都没有问题则执行reload_cmd中指定的reload命令。

3.1. IsConfigChanged

IsConfigChanged比较源文件和目标文件是否相等,其中比较内容包括:UidGidModeMd5。只要其中任意值不同则认为两个文件不同。

// IsConfigChanged reports whether src and dest config files are equal.
// Two config files are equal when they have the same file contents and
// Unix permissions. The owner, group, and mode must match.
// It return false in other cases.
func IsConfigChanged(src, dest string) (bool, error) {
	if !IsFileExist(dest) {
		return true, nil
	}
	d, err := FileStat(dest)
	if err != nil {
		return true, err
	}
	s, err := FileStat(src)
	if err != nil {
		return true, err
	}
	if d.Uid != s.Uid {
		log.Info(fmt.Sprintf("%s has UID %d should be %d", dest, d.Uid, s.Uid))
	}
	if d.Gid != s.Gid {
		log.Info(fmt.Sprintf("%s has GID %d should be %d", dest, d.Gid, s.Gid))
	}
	if d.Mode != s.Mode {
		log.Info(fmt.Sprintf("%s has mode %s should be %s", dest, os.FileMode(d.Mode), os.FileMode(s.Mode)))
	}
	if d.Md5 != s.Md5 {
		log.Info(fmt.Sprintf("%s has md5sum %s should be %s", dest, d.Md5, s.Md5))
	}
	if d.Uid != s.Uid || d.Gid != s.Gid || d.Mode != s.Mode || d.Md5 != s.Md5 {
		return true, nil
	}
	return false, nil
}

如果文件发生改变则执行check_cmd命令(有配置的情况下),重新生成配置文件,并执行reload_cmd命令(有配置的情况下)。

if ok {
	log.Info("Target config " + t.Dest + " out of sync")
	if !t.syncOnly && t.CheckCmd != "" {
		if err := t.check(); err != nil {
			return errors.New("Config check failed: " + err.Error())
		}
	}
	log.Debug("Overwriting target config " + t.Dest)
	err := os.Rename(staged, t.Dest)
	if err != nil {
		if strings.Contains(err.Error(), "device or resource busy") {
			log.Debug("Rename failed - target is likely a mount. Trying to write instead")
			// try to open the file and write to it
			var contents []byte
			var rerr error
			contents, rerr = ioutil.ReadFile(staged)
			if rerr != nil {
				return rerr
			}
			err := ioutil.WriteFile(t.Dest, contents, t.FileMode)
			// make sure owner and group match the temp file, in case the file was created with WriteFile
			os.Chown(t.Dest, t.Uid, t.Gid)
			if err != nil {
				return err
			}
		} else {
			return err
		}
	}
	if !t.syncOnly && t.ReloadCmd != "" {
		if err := t.reload(); err != nil {
			return err
		}
	}
	log.Info("Target config " + t.Dest + " has been updated")
} else {
	log.Debug("Target config " + t.Dest + " in sync")
}

3.2. check

check检查暂存的配置文件即stageFile,该文件是由最新的后端存储中的数据生成的。

if !t.syncOnly && t.CheckCmd != "" {
	if err := t.check(); err != nil {
		return errors.New("Config check failed: " + err.Error())
	}
}

t.check()只是执行配置文件中checkcmd参数指定的命令而已,根据是否执行成功来返回报错。当check命令产生错误的是,则直接return报错,不再执行重新生成配置文件和``reload`的操作了。

// check executes the check command to validate the staged config file. The
// command is modified so that any references to src template are substituted
// with a string representing the full path of the staged file. This allows the
// check to be run on the staged file before overwriting the destination config
// file.
// It returns nil if the check command returns 0 and there are no other errors.
func (t *TemplateResource) check() error {
	var cmdBuffer bytes.Buffer
	data := make(map[string]string)
	data["src"] = t.StageFile.Name()
	tmpl, err := template.New("checkcmd").Parse(t.CheckCmd)
	if err != nil {
		return err
	}
	if err := tmpl.Execute(&cmdBuffer, data); err != nil {
		return err
	}
	return runCommand(cmdBuffer.String())
}

check会通过模板解析的方式解析出checkcmd中的{{.src}}部分,并用stageFile来替代。即check的命令是拉取最新后端存储的数据形成临时配置文件(stageFile),并通过指定的checkcmd来检查最新的临时配置文件是否合法,如果合法则替换会新的配置文件,否则返回错误。

3.3. Overwriting

staged文件命名为Dest文件的名字,读取staged文件中的内容并将它写入到Dest文件中,该过程实际上就是重新生成一份新的配置文件。staged文件的生成逻辑在函数createStageFile中。

log.Debug("Overwriting target config " + t.Dest)
err := os.Rename(staged, t.Dest)
if err != nil {
	if strings.Contains(err.Error(), "device or resource busy") {
		log.Debug("Rename failed - target is likely a mount. Trying to write instead")
		// try to open the file and write to it
		var contents []byte
		var rerr error
		contents, rerr = ioutil.ReadFile(staged)
		if rerr != nil {
			return rerr
		}
		err := ioutil.WriteFile(t.Dest, contents, t.FileMode)
		// make sure owner and group match the temp file, in case the file was created with WriteFile
		os.Chown(t.Dest, t.Uid, t.Gid)
		if err != nil {
			return err
		}
	} else {
		return err
	}
}

3.4. reload

如果没有指定syncOnly参数并且指定了ReloadCmd则执行reload操作。

if !t.syncOnly && t.ReloadCmd != "" {
	if err := t.reload(); err != nil {
		return err
	}
}

其中t.reload()实现如下:

// reload executes the reload command.
// It returns nil if the reload command returns 0.
func (t *TemplateResource) reload() error {
	return runCommand(t.ReloadCmd)
}

t.reload()t.check()都调用了runCommand函数:

// runCommand is a shared function used by check and reload
// to run the given command and log its output.
// It returns nil if the given cmd returns 0.
// The command can be run on unix and windows.
func runCommand(cmd string) error {
	log.Debug("Running " + cmd)
	var c *exec.Cmd
	if runtime.GOOS == "windows" {
		c = exec.Command("cmd", "/C", cmd)
	} else {
		c = exec.Command("/bin/sh", "-c", cmd)
	}

	output, err := c.CombinedOutput()
	if err != nil {
		log.Error(fmt.Sprintf("%q", string(output)))
		return err
	}
	log.Debug(fmt.Sprintf("%q", string(output)))
	return nil
}

4. redisClient.WatchPrefix

redisClient.WatchPrefix是当用户设置了watch参数的时候,并且存储后端为redis,则会调用到redis的watch机制。其中redisClient.WatchPrefix是redis存储类型的时候实现了StoreClient接口的WatchPrefix方法。

// The StoreClient interface is implemented by objects that can retrieve
// key/value pairs from a backend store.
type StoreClient interface {
	GetValues(keys []string) (map[string]string, error)
	WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error)
}

StoreClient是对后端存储类型的抽象,常用的后端存储类型有EtcdRedis等,不同的后端存储类型GetValuesWatchPrefix的具体实现不同,本文主要分析Redis类型的watch机制。

4.1. WatchPrefix

WatchPrefix的调用函数在monitorPrefix的部分,具体参考:

func (p *watchProcessor) monitorPrefix(t *TemplateResource) {
	defer p.wg.Done()
	keys := util.AppendPrefix(t.Prefix, t.Keys)
	for {
		index, err := t.storeClient.WatchPrefix(t.Prefix, keys, t.lastIndex, p.stopChan)
		if err != nil {
			p.errChan <- err
			// Prevent backend errors from consuming all resources.
			time.Sleep(time.Second * 2)
			continue
		}
		t.lastIndex = index
		if err := t.process(); err != nil {
			p.errChan <- err
		}
	}
}

redis的watch主要通过pub-sub的机制,即WatchPrefix会根据传入的prefix起一个sub的监听机制,而在写入redis的数据的同时需要执行redis的publish操作,channel为符合prefix的值,value为给定命令之一,实际上是给定命令之一,具体是什么命令并没有关系,则会触发watch机制,从而自动更新配置,给定的命令列表如下:

"del", "append", "rename_from", "rename_to", "expire", "set", "incrby", "incrbyfloat", "hset", "hincrby", "hincrbyfloat", "hdel"

sub监听的key的格式如下:

__keyspace@0__:{prefix}/*

如果只是写入redis数据而没有自动执行publish的操作,并不会触发redis的watch机制来自动更新配置。但是如果使用etcd,则etcd的watch机制,只需要用户写入或更新数据就可以自动触发更新配置。

WatchPrefix源码如下:

func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {
		
	if waitIndex == 0 {
		return 1, nil
	}

	if len(c.pscChan) > 0 {
		var respChan watchResponse
		for len(c.pscChan) > 0 {
			respChan = <-c.pscChan
		}
		return respChan.waitIndex, respChan.err
	}

	go func() {
		if c.psc.Conn == nil {
			rClient, db, err := tryConnect(c.machines, c.password, false);
	
			if err != nil {
				c.psc = redis.PubSubConn{Conn: nil}
				c.pscChan <- watchResponse{0, err}
				return
			}
		
			c.psc = redis.PubSubConn{Conn: rClient}		

			go func() {
				defer func() {
					c.psc.Close()
					c.psc = redis.PubSubConn{Conn: nil}
				}()
				for {
					switch n := c.psc.Receive().(type) {
						case redis.PMessage:
							log.Debug(fmt.Sprintf("Redis Message: %s %s\n", n.Channel, n.Data))
							data := string(n.Data)
							commands := [12]string{"del", "append", "rename_from", "rename_to", "expire", "set", "incrby", "incrbyfloat", "hset", "hincrby", "hincrbyfloat", "hdel"}
							for _, command := range commands {
								if command == data {
									c.pscChan <- watchResponse{1, nil}
									break
								}
							}
						case redis.Subscription:
							log.Debug(fmt.Sprintf("Redis Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count))
							if n.Count == 0 {
								c.pscChan <- watchResponse{0, nil}
								return
							}
						case error:
							log.Debug(fmt.Sprintf("Redis error: %v\n", n))
							c.pscChan <- watchResponse{0, n}
							return
					}
				}
			}()
			
			c.psc.PSubscribe("__keyspace@" + strconv.Itoa(db) + "__:" + c.transform(prefix) + "*")
		}
	}()

	select {
	case <-stopChan:
		c.psc.PUnsubscribe()
		return waitIndex, nil
	case r := <- c.pscChan:
		return r.waitIndex, r.err
	}
}

5. 总结

  1. confd的作用是通过将配置存放到存储后端,来自动触发更新配置的功能,其中常用的后端有EtcdRedis等。
  2. 不同的存储后端,watch机制不同,例如Etcd只需要更新key便可以触发自动更新配置的操作,而redis除了更新key还需要执行publish的操作。
  3. 可以通过配置check_cmd来校验配置文件是否正确,如果配置文件非法则不会执行自动更新配置和reload的操作,但是当存储后端存入的非法数据,会导致每次校验都是失败的,即使后面新增的配置部分是合法的,所以需要有机制来控制存入存储后端的数据始终是合法的。

参考:


最后修改 July 17, 2022: Fix/bug (#1) (d9e9197)