Merge pull request #680 from printfcoder/master
fix file watcher event bug on Linux
This commit is contained in:
		| @@ -1,6 +1,7 @@ | |||||||
| package config | package config | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"os" | 	"os" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| @@ -8,10 +9,15 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/google/uuid" | ||||||
| 	"github.com/micro/go-micro/config/source/env" | 	"github.com/micro/go-micro/config/source/env" | ||||||
| 	"github.com/micro/go-micro/config/source/file" | 	"github.com/micro/go-micro/config/source/file" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	sep = string(os.PathSeparator) | ||||||
|  | ) | ||||||
|  |  | ||||||
| func createFileForIssue18(t *testing.T, content string) *os.File { | func createFileForIssue18(t *testing.T, content string) *os.File { | ||||||
| 	data := []byte(content) | 	data := []byte(content) | ||||||
| 	path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) | 	path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) | ||||||
| @@ -116,3 +122,57 @@ func TestConfigMerge(t *testing.T) { | |||||||
| 			actualHost) | 			actualHost) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestFileChange(t *testing.T) { | ||||||
|  | 	// create a temp file | ||||||
|  | 	fileName := uuid.New().String() + "testWatcher.json" | ||||||
|  | 	f, err := os.OpenFile("."+sep+fileName, os.O_WRONLY|os.O_CREATE, 0666) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Error(err) | ||||||
|  | 	} | ||||||
|  | 	defer f.Close() | ||||||
|  | 	defer os.Remove("." + sep + fileName) | ||||||
|  |  | ||||||
|  | 	// load the file | ||||||
|  | 	if err := Load(file.NewSource( | ||||||
|  | 		file.WithPath("." + sep + fileName), | ||||||
|  | 	)); err != nil { | ||||||
|  | 		t.Error(err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// watch changes | ||||||
|  | 	watcher, err := Watch() | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Error(err) | ||||||
|  | 	} | ||||||
|  | 	changeTimes := 0 | ||||||
|  | 	go func() { | ||||||
|  | 		for { | ||||||
|  | 			v, err := watcher.Next() | ||||||
|  | 			if err != nil { | ||||||
|  | 				t.Error(err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			changeTimes++ | ||||||
|  | 			t.Logf("file change,%s", string(v.Bytes())) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	content := map[int]string{} | ||||||
|  | 	// change the file | ||||||
|  | 	for i := 0; i < 5; i++ { | ||||||
|  | 		content[i] = time.Now().String() | ||||||
|  | 		bytes, _ := json.Marshal(content) | ||||||
|  | 		f.Truncate(0) | ||||||
|  | 		f.Seek(0, 0) | ||||||
|  | 		if _, err := f.Write(bytes); err != nil { | ||||||
|  | 			t.Error(err) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		time.Sleep(time.Second) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if changeTimes != 4 { | ||||||
|  | 		t.Error(fmt.Errorf("watcher error: change times %d is not enough", changeTimes)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -1,3 +1,5 @@ | |||||||
|  | //+build !linux | ||||||
|  |  | ||||||
| package file | package file | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|   | |||||||
							
								
								
									
										72
									
								
								config/source/file/watcher_linux.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								config/source/file/watcher_linux.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,72 @@ | |||||||
|  | //+build linux | ||||||
|  |  | ||||||
|  | package file | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"errors" | ||||||
|  | 	"os" | ||||||
|  |  | ||||||
|  | 	"github.com/fsnotify/fsnotify" | ||||||
|  | 	"github.com/micro/go-micro/config/source" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type watcher struct { | ||||||
|  | 	f *file | ||||||
|  |  | ||||||
|  | 	fw   *fsnotify.Watcher | ||||||
|  | 	exit chan bool | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newWatcher(f *file) (source.Watcher, error) { | ||||||
|  | 	fw, err := fsnotify.NewWatcher() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	fw.Add(f.path) | ||||||
|  |  | ||||||
|  | 	return &watcher{ | ||||||
|  | 		f:    f, | ||||||
|  | 		fw:   fw, | ||||||
|  | 		exit: make(chan bool), | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *watcher) Next() (*source.ChangeSet, error) { | ||||||
|  | 	// is it closed? | ||||||
|  | 	select { | ||||||
|  | 	case <-w.exit: | ||||||
|  | 		return nil, errors.New("watcher stopped") | ||||||
|  | 	default: | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// try get the event | ||||||
|  | 	select { | ||||||
|  | 	case event, _ := <-w.fw.Events: | ||||||
|  | 		if event.Op == fsnotify.Rename { | ||||||
|  | 			// check existence of file, and add watch again | ||||||
|  | 			_, err := os.Stat(event.Name) | ||||||
|  | 			if err == nil || os.IsExist(err) { | ||||||
|  | 				w.fw.Add(event.Name) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		c, err := w.f.Read() | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// add path again for the event bug of fsnotify | ||||||
|  | 		w.fw.Add(w.f.path) | ||||||
|  |  | ||||||
|  | 		return c, nil | ||||||
|  | 	case err := <-w.fw.Errors: | ||||||
|  | 		return nil, err | ||||||
|  | 	case <-w.exit: | ||||||
|  | 		return nil, errors.New("watcher stopped") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *watcher) Stop() error { | ||||||
|  | 	return w.fw.Close() | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user