1// Copyright (c) 2014-2015 The Notify Authors. All rights reserved. 2// Use of this source code is governed by the MIT license that can be 3// found in the LICENSE file. 4 5// +build darwin,!kqueue 6 7package notify 8 9/* 10#include <CoreServices/CoreServices.h> 11 12typedef void (*CFRunLoopPerformCallBack)(void*); 13 14void gosource(void *); 15void gostream(uintptr_t, uintptr_t, size_t, uintptr_t, uintptr_t, uintptr_t); 16 17static FSEventStreamRef EventStreamCreate(FSEventStreamContext * context, uintptr_t info, CFArrayRef paths, FSEventStreamEventId since, CFTimeInterval latency, FSEventStreamCreateFlags flags) { 18 context->info = (void*) info; 19 return FSEventStreamCreate(NULL, (FSEventStreamCallback) gostream, context, paths, since, latency, flags); 20} 21 22#cgo LDFLAGS: -framework CoreServices 23*/ 24import "C" 25 26import ( 27 "errors" 28 "os" 29 "runtime" 30 "sync" 31 "sync/atomic" 32 "unsafe" 33) 34 35var nilstream C.FSEventStreamRef 36 37// Default arguments for FSEventStreamCreate function. 38var ( 39 latency C.CFTimeInterval 40 flags = C.FSEventStreamCreateFlags(C.kFSEventStreamCreateFlagFileEvents | C.kFSEventStreamCreateFlagNoDefer) 41 since = uint64(C.FSEventsGetCurrentEventId()) 42) 43 44var runloop C.CFRunLoopRef // global runloop which all streams are registered with 45var wg sync.WaitGroup // used to wait until the runloop starts 46 47// source is used for synchronization purposes - it signals when runloop has 48// started and is ready via the wg. It also serves purpose of a dummy source, 49// thanks to it the runloop does not return as it also has at least one source 50// registered. 51var source = C.CFRunLoopSourceCreate(C.kCFAllocatorDefault, 0, &C.CFRunLoopSourceContext{ 52 perform: (C.CFRunLoopPerformCallBack)(C.gosource), 53}) 54 55// Errors returned when FSEvents functions fail. 56var ( 57 errCreate = os.NewSyscallError("FSEventStreamCreate", errors.New("NULL")) 58 errStart = os.NewSyscallError("FSEventStreamStart", errors.New("false")) 59) 60 61// initializes the global runloop and ensures any created stream awaits its 62// readiness. 63func init() { 64 wg.Add(1) 65 go func() { 66 // There is exactly one run loop per thread. Lock this goroutine to its 67 // thread to ensure that it's not rescheduled on a different thread while 68 // setting up the run loop. 69 runtime.LockOSThread() 70 runloop = C.CFRunLoopGetCurrent() 71 C.CFRunLoopAddSource(runloop, source, C.kCFRunLoopDefaultMode) 72 C.CFRunLoopRun() 73 panic("runloop has just unexpectedly stopped") 74 }() 75 C.CFRunLoopSourceSignal(source) 76} 77 78//export gosource 79func gosource(unsafe.Pointer) { 80 wg.Done() 81} 82 83//export gostream 84func gostream(_, info uintptr, n C.size_t, paths, flags, ids uintptr) { 85 const ( 86 offchar = unsafe.Sizeof((*C.char)(nil)) 87 offflag = unsafe.Sizeof(C.FSEventStreamEventFlags(0)) 88 offid = unsafe.Sizeof(C.FSEventStreamEventId(0)) 89 ) 90 if n == 0 { 91 return 92 } 93 fn := streamFuncs.get(info) 94 if fn == nil { 95 return 96 } 97 ev := make([]FSEvent, 0, int(n)) 98 for i := uintptr(0); i < uintptr(n); i++ { 99 switch flags := *(*uint32)(unsafe.Pointer((flags + i*offflag))); { 100 case flags&uint32(FSEventsEventIdsWrapped) != 0: 101 atomic.StoreUint64(&since, uint64(C.FSEventsGetCurrentEventId())) 102 default: 103 ev = append(ev, FSEvent{ 104 Path: C.GoString(*(**C.char)(unsafe.Pointer(paths + i*offchar))), 105 Flags: flags, 106 ID: *(*uint64)(unsafe.Pointer(ids + i*offid)), 107 }) 108 } 109 110 } 111 fn(ev) 112} 113 114// StreamFunc is a callback called when stream receives file events. 115type streamFunc func([]FSEvent) 116 117var streamFuncs = streamFuncRegistry{m: map[uintptr]streamFunc{}} 118 119type streamFuncRegistry struct { 120 mu sync.Mutex 121 m map[uintptr]streamFunc 122 i uintptr 123} 124 125func (r *streamFuncRegistry) get(id uintptr) streamFunc { 126 r.mu.Lock() 127 defer r.mu.Unlock() 128 return r.m[id] 129} 130 131func (r *streamFuncRegistry) add(fn streamFunc) uintptr { 132 r.mu.Lock() 133 defer r.mu.Unlock() 134 r.i++ 135 r.m[r.i] = fn 136 return r.i 137} 138 139func (r *streamFuncRegistry) delete(id uintptr) { 140 r.mu.Lock() 141 defer r.mu.Unlock() 142 delete(r.m, id) 143} 144 145// Stream represents single watch-point which listens for events scheduled by 146// the global runloop. 147type stream struct { 148 path string 149 ref C.FSEventStreamRef 150 info uintptr 151} 152 153// NewStream creates a stream for given path, listening for file events and 154// calling fn upon receiving any. 155func newStream(path string, fn streamFunc) *stream { 156 return &stream{ 157 path: path, 158 info: streamFuncs.add(fn), 159 } 160} 161 162// Start creates a FSEventStream for the given path and schedules it with 163// global runloop. It's a nop if the stream was already started. 164func (s *stream) Start() error { 165 if s.ref != nilstream { 166 return nil 167 } 168 wg.Wait() 169 p := C.CFStringCreateWithCStringNoCopy(C.kCFAllocatorDefault, C.CString(s.path), C.kCFStringEncodingUTF8, C.kCFAllocatorDefault) 170 path := C.CFArrayCreate(C.kCFAllocatorDefault, (*unsafe.Pointer)(unsafe.Pointer(&p)), 1, nil) 171 ctx := C.FSEventStreamContext{} 172 ref := C.EventStreamCreate(&ctx, C.uintptr_t(s.info), path, C.FSEventStreamEventId(atomic.LoadUint64(&since)), latency, flags) 173 if ref == nilstream { 174 return errCreate 175 } 176 C.FSEventStreamScheduleWithRunLoop(ref, runloop, C.kCFRunLoopDefaultMode) 177 if C.FSEventStreamStart(ref) == C.Boolean(0) { 178 C.FSEventStreamInvalidate(ref) 179 return errStart 180 } 181 C.CFRunLoopWakeUp(runloop) 182 s.ref = ref 183 return nil 184} 185 186// Stop stops underlying FSEventStream and unregisters it from global runloop. 187func (s *stream) Stop() { 188 if s.ref == nilstream { 189 return 190 } 191 wg.Wait() 192 C.FSEventStreamStop(s.ref) 193 C.FSEventStreamInvalidate(s.ref) 194 C.CFRunLoopWakeUp(runloop) 195 s.ref = nilstream 196 streamFuncs.delete(s.info) 197} 198