Skip to content

emcfarlane/observer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

observer

GoDoc

Experimental concurrency primatives!

Subject[T]

Subject is a generic unbounded concurrent observer. It's based on an atomic counter per fixed length array. Adding elements increments the counter to select the insert position. If the counter is greater than the list size a new array is allocated and the process is repeated for the new array. Readers hold a pointer to a position in the array. Replaces a list of channels.

var s observer.Subject[string]
go s.Set("hello")

go func() {
	for v := s.View(); ; v = v.Next() {
		fmt.Println(v.Value())
	}
}()

Benchmark

Observer allocations are amortize over the array length. Faster than a single channel and cost is linear when increase the number of readers unlike a an array of channels. Trade off is in the queue becoming unbounded.

BenchmarkObserver/1-8    35960995  34.22 ns/op  9 B/op  0 allocs/op
BenchmarkObserver/8-8     7084509  145.5 ns/op  9 B/op  0 allocs/op
BenchmarkObserver/32-8   14361388  94.58 ns/op  9 B/op  0 allocs/op
BenchmarkChannel/1-8     20000000   74.2 ns/op  0 B/op  0 allocs/op
BenchmarkChannel/8-8      1000000   1353 ns/op  0 B/op  0 allocs/op
BenchmarkChannel/32-8      200000   6261 ns/op  0 B/op  0 allocs/op

Map

Concurrent map is similar to the builtin sync.Map but with a different locking stratergy. The map is duplicated into a read/write pair that are switched based on three atomic counters. A counter shared between the two maps and a counter per map. When loading the maps mutual counter is incremented. This flags which map to read from. Once the read is finished the maps counter is incremented. Writers wait for all readers of the map to leave, then swap the read/write map flag so the next write can be processed.

This has some benifits/trade offs in read/write workloads. See the benchmarks. There is no concurrent process, like compaction, running alongside the map which has the benifit of maintaing throughput under sustained load.

Range operation isn't support as this would lock the entire map, unlike sync.Map. However we gain a Tx method that supports a transaction like read/write`. Useful for counters and optional updates.

var m observer.Map
m.Set("key", 1)

m.Tx("key", func(val interface{}, ok bool) {
  if ok {
    return val.(int) + 1, ok
  }
  return nil, false
})
val, ok := m.Get(key)

Benchmark

Benchmark is based off of Dgraphs cache testing.

key: Map is this Map, SyncMap is sync.Map and RWMap is a Go map with a sync.Mutex to protect it.

BenchmarkCaches/MapZipfRead
BenchmarkCaches/MapZipfRead-8           21317090                54.80 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/SyncMapZipfRead
BenchmarkCaches/SyncMapZipfRead-8       22807447                54.58 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/RWMapZipfRead
BenchmarkCaches/RWMapZipfRead-8         27089253                45.28 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/MapOneKeyRead
BenchmarkCaches/MapOneKeyRead-8         31896468                38.77 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/SyncMapOneKeyRead
BenchmarkCaches/SyncMapOneKeyRead-8     100000000               10.91 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/RWMapOneKeyRead
BenchmarkCaches/RWMapOneKeyRead-8       28684920                41.93 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/MapZipIfWrite
BenchmarkCaches/MapZipIfWrite-8          2000820               610.6 ns/op            55 B/op          3 allocs/op
BenchmarkCaches/SyncMapZipfWrite
BenchmarkCaches/SyncMapZipfWrite-8       2193643               550.7 ns/op            71 B/op          5 allocs/op
BenchmarkCaches/RWMapZipfWrite
BenchmarkCaches/RWMapZipfWrite-8         4045146               298.6 ns/op            15 B/op          1 allocs/op
BenchmarkCaches/MapOneIfWrite
BenchmarkCaches/MapOneIfWrite-8          2679439               446.5 ns/op            56 B/op          4 allocs/op
BenchmarkCaches/SyncMapOneKeyWrite
BenchmarkCaches/SyncMapOneKeyWrite-8     3835924               312.0 ns/op            72 B/op          5 allocs/op
BenchmarkCaches/RWMapOneKeyWrite
BenchmarkCaches/RWMapOneKeyWrite-8       5000916               240.5 ns/op            16 B/op          2 allocs/op
BenchmarkCaches/MapZipfMixed
BenchmarkCaches/MapZipfMixed-8          16913481                68.68 ns/op            2 B/op          0 allocs/op
BenchmarkCaches/SyncMapZipfMixed
BenchmarkCaches/SyncMapZipfMixed-8      19314884                58.52 ns/op           14 B/op          0 allocs/op
BenchmarkCaches/RWMapZipfMixed
BenchmarkCaches/RWMapZipfMixed-8         2572059               443.6 ns/op             1 B/op          0 allocs/op
BenchmarkCaches/MapOneKeyMixed
BenchmarkCaches/MapOneKeyMixed-8        26252911                45.45 ns/op            2 B/op          0 allocs/op
BenchmarkCaches/SyncMapOneKeyMixed
BenchmarkCaches/SyncMapOneKeyMixed-8    64579701                18.14 ns/op            6 B/op          0 allocs/op
BenchmarkCaches/RWMapOneKeyMixed
BenchmarkCaches/RWMapOneKeyMixed-8       8419248               141.7 ns/op             0 B/op          0 allocs/op

About

Observer pattern

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages