Skip to content

Commit 7b92f28

Browse files
committed
refactor: switch cache to per-register storage
Cache keys now address individual registers/coils instead of request ranges. This fixes stale data when writes hit registers that are part of a larger cached read range. Key changes: - RegKey(slaveID, fc, addr) for per-register storage - RangeKey(slaveID, fc, addr, qty) for request coalescing - GetRange/SetRange/DeleteRange for batch operations - Rename GetOrFetch to Coalesce (no longer interacts with cache) - Add keepStale flag to prevent cleanup from removing entries needed for stale-serve fallback
1 parent 52606bd commit 7b92f28

File tree

2 files changed

+286
-66
lines changed

2 files changed

+286
-66
lines changed

internal/cache/cache.go

Lines changed: 100 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ func (e *Entry) IsExpired() bool {
2020
return time.Since(e.Timestamp) > e.TTL
2121
}
2222

23-
// Cache is a thread-safe in-memory cache with TTL.
23+
// Cache is a thread-safe in-memory cache with TTL and per-register storage.
2424
type Cache struct {
2525
mu sync.RWMutex
2626
entries map[string]*Entry
2727
defaultTTL time.Duration
28+
keepStale bool // when true, cleanup won't delete expired entries
2829

2930
// For request coalescing
3031
inflight map[string]*inflightRequest
@@ -41,10 +42,12 @@ type inflightRequest struct {
4142
}
4243

4344
// New creates a new cache with the specified default TTL.
44-
func New(defaultTTL time.Duration) *Cache {
45+
// If keepStale is true, expired entries are retained for stale serving.
46+
func New(defaultTTL time.Duration, keepStale bool) *Cache {
4547
c := &Cache{
4648
entries: make(map[string]*Entry),
4749
defaultTTL: defaultTTL,
50+
keepStale: keepStale,
4851
inflight: make(map[string]*inflightRequest),
4952
done: make(chan struct{}),
5053
}
@@ -60,9 +63,14 @@ func (c *Cache) Close() {
6063
close(c.done)
6164
}
6265

63-
// Key generates a cache key from request parameters.
64-
func Key(slaveID byte, functionCode byte, address uint16, quantity uint16) string {
65-
return fmt.Sprintf("%d:%d:%d:%d", slaveID, functionCode, address, quantity)
66+
// RegKey generates a cache key for a single register or coil.
67+
func RegKey(slaveID byte, functionCode byte, address uint16) string {
68+
return fmt.Sprintf("%d:%d:%d", slaveID, functionCode, address)
69+
}
70+
71+
// RangeKey generates a coalescing key for a request range.
72+
func RangeKey(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) string {
73+
return fmt.Sprintf("%d:%d:%d:%d", slaveID, functionCode, startAddr, quantity)
6674
}
6775

6876
// Get retrieves a value from the cache.
@@ -100,11 +108,6 @@ func (c *Cache) GetStale(key string) ([]byte, bool) {
100108

101109
// Set stores a value in the cache with the default TTL.
102110
func (c *Cache) Set(key string, data []byte) {
103-
c.SetWithTTL(key, data, c.defaultTTL)
104-
}
105-
106-
// SetWithTTL stores a value in the cache with a specific TTL.
107-
func (c *Cache) SetWithTTL(key string, data []byte, ttl time.Duration) {
108111
c.mu.Lock()
109112
defer c.mu.Unlock()
110113

@@ -115,7 +118,7 @@ func (c *Cache) SetWithTTL(key string, data []byte, ttl time.Duration) {
115118
c.entries[key] = &Entry{
116119
Data: dataCopy,
117120
Timestamp: time.Now(),
118-
TTL: ttl,
121+
TTL: c.defaultTTL,
119122
}
120123
}
121124

@@ -126,32 +129,95 @@ func (c *Cache) Delete(key string) {
126129
delete(c.entries, key)
127130
}
128131

129-
// GetOrFetch retrieves a value from the cache or fetches it using the provided function.
130-
// This implements request coalescing - multiple concurrent requests for the same key
131-
// will share a single fetch operation.
132-
// Returns the data, a boolean indicating if it was a cache hit, and any error.
133-
func (c *Cache) GetOrFetch(ctx context.Context, key string, fetch func(context.Context) ([]byte, error)) ([]byte, bool, error) {
134-
// Check cache first
135-
if data, ok := c.Get(key); ok {
136-
return data, true, nil
132+
// GetRange retrieves all values for a contiguous register range.
133+
// Returns the per-register/coil values and true only if ALL are cached and fresh.
134+
func (c *Cache) GetRange(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) ([][]byte, bool) {
135+
c.mu.RLock()
136+
defer c.mu.RUnlock()
137+
138+
values := make([][]byte, quantity)
139+
for i := uint16(0); i < quantity; i++ {
140+
key := RegKey(slaveID, functionCode, startAddr+i)
141+
entry, ok := c.entries[key]
142+
if !ok || entry.IsExpired() {
143+
return nil, false
144+
}
145+
data := make([]byte, len(entry.Data))
146+
copy(data, entry.Data)
147+
values[i] = data
148+
}
149+
return values, true
150+
}
151+
152+
// GetRangeStale retrieves all values for a contiguous register range, ignoring TTL.
153+
// Returns the per-register/coil values and true only if ALL are present (even if expired).
154+
func (c *Cache) GetRangeStale(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) ([][]byte, bool) {
155+
c.mu.RLock()
156+
defer c.mu.RUnlock()
157+
158+
values := make([][]byte, quantity)
159+
for i := uint16(0); i < quantity; i++ {
160+
key := RegKey(slaveID, functionCode, startAddr+i)
161+
entry, ok := c.entries[key]
162+
if !ok {
163+
return nil, false
164+
}
165+
data := make([]byte, len(entry.Data))
166+
copy(data, entry.Data)
167+
values[i] = data
168+
}
169+
return values, true
170+
}
171+
172+
// SetRange stores individual values for a contiguous register range.
173+
// All entries are stored with the same timestamp for consistency.
174+
func (c *Cache) SetRange(slaveID byte, functionCode byte, startAddr uint16, values [][]byte) {
175+
c.mu.Lock()
176+
defer c.mu.Unlock()
177+
178+
now := time.Now()
179+
for i, v := range values {
180+
key := RegKey(slaveID, functionCode, startAddr+uint16(i))
181+
dataCopy := make([]byte, len(v))
182+
copy(dataCopy, v)
183+
c.entries[key] = &Entry{
184+
Data: dataCopy,
185+
Timestamp: now,
186+
TTL: c.defaultTTL,
187+
}
137188
}
189+
}
190+
191+
// DeleteRange removes all entries for a contiguous register range.
192+
func (c *Cache) DeleteRange(slaveID byte, functionCode byte, startAddr uint16, quantity uint16) {
193+
c.mu.Lock()
194+
defer c.mu.Unlock()
138195

139-
// Check if there's already an in-flight request
196+
for i := uint16(0); i < quantity; i++ {
197+
key := RegKey(slaveID, functionCode, startAddr+i)
198+
delete(c.entries, key)
199+
}
200+
}
201+
202+
// Coalesce ensures only one fetch runs for a given key at a time.
203+
// Other callers with the same key wait for and share the first caller's result.
204+
// This handles request coalescing only — it does not interact with cache storage.
205+
func (c *Cache) Coalesce(ctx context.Context, key string, fetch func(context.Context) ([]byte, error)) ([]byte, error) {
140206
c.inflightMu.Lock()
141207
if req, ok := c.inflight[key]; ok {
142208
c.inflightMu.Unlock()
143209
// Wait for the in-flight request to complete
144210
select {
145211
case <-req.done:
146212
if req.err != nil {
147-
return nil, false, req.err
213+
return nil, req.err
148214
}
149215
// Return a copy
150216
data := make([]byte, len(req.result))
151217
copy(data, req.result)
152-
return data, false, nil
218+
return data, nil
153219
case <-ctx.Done():
154-
return nil, false, ctx.Err()
220+
return nil, ctx.Err()
155221
}
156222
}
157223

@@ -165,22 +231,23 @@ func (c *Cache) GetOrFetch(ctx context.Context, key string, fetch func(context.C
165231
// Fetch the data
166232
data, err := fetch(ctx)
167233

168-
// Store result
234+
// Store result for waiters
169235
req.result = data
170236
req.err = err
171237

172-
// Cache successful results
173-
if err == nil {
174-
c.Set(key, data)
175-
}
176-
177238
// Clean up and notify waiters
178239
c.inflightMu.Lock()
179240
delete(c.inflight, key)
180241
c.inflightMu.Unlock()
181242
close(req.done)
182243

183-
return data, false, err
244+
if err != nil {
245+
return nil, err
246+
}
247+
248+
result := make([]byte, len(data))
249+
copy(result, data)
250+
return result, nil
184251
}
185252

186253
// cleanup periodically removes expired entries.
@@ -193,6 +260,9 @@ func (c *Cache) cleanup() {
193260
case <-c.done:
194261
return
195262
case <-ticker.C:
263+
if c.keepStale {
264+
continue
265+
}
196266
c.mu.Lock()
197267
for key, entry := range c.entries {
198268
if entry.IsExpired() {

0 commit comments

Comments
 (0)