@@ -16,6 +16,7 @@ import (
1616 "sort"
1717 "strconv"
1818 "strings"
19+ "sync"
1920 "sync/atomic"
2021 "time"
2122 "unicode"
@@ -573,6 +574,11 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo
573574 if err != nil {
574575 return errors .Wrapf (err , "While doing mutations:" )
575576 }
577+
578+ // Check program authorization for modifying existing data (after namespace is available)
579+ if err := checkMutationProgramAuth (ctx , edges , ns ); err != nil {
580+ return err
581+ }
576582 predHints := make (map [string ]pb.Metadata_HintType )
577583 for _ , gmu := range qc .gmuList {
578584 for pred , hint := range gmu .Metadata .GetPredHints () {
@@ -660,6 +666,10 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo
660666 resp .Txn .Keys = resp .Txn .Keys [:0 ]
661667 resp .Txn .CommitTs = cts
662668 calculateMutationMetrics ()
669+
670+ // Update the program facet cache for future authorization checks
671+ updateProgramFacetCache (ns , edges )
672+
663673 return nil
664674}
665675
@@ -2245,6 +2255,148 @@ func parseSubject(predSubject string) (uint64, error) {
22452255 }
22462256}
22472257
2258+ // programFacetCache stores recently committed program facets for authorization checks.
2259+ //
2260+ // WHY THIS EXISTS:
2261+ // Dgraph's MemoryLayer is a read-through cache - it only populates on reads, not writes.
2262+ // When a mutation commits, the data goes to badger asynchronously. The MemoryLayer's
2263+ // updateItemInCache() only updates keys that are ALREADY cached; it doesn't add new ones.
2264+ // (See posting/mvcc.go:589-591)
2265+ //
2266+ // For program authorization, we need to check if a user can modify existing data BEFORE
2267+ // the Raft proposal. But for recently created data, the MemoryLayer won't have it cached
2268+ // and badger won't have synced it yet, so GetNoStoreSafe returns 0 postings.
2269+ //
2270+ // This cache provides write-through semantics specifically for program facets:
2271+ // - Updated after successful CommitOverNetwork in doMutate()
2272+ // - Checked in checkMutationProgramAuth() before the posting store fallback
2273+ //
2274+ // FUTURE CONSIDERATION:
2275+ // A more holistic approach might store program data directly in the Posting protobuf
2276+ // rather than as facets, which would integrate naturally with the existing caching.
2277+ // However, that would require protobuf changes and migration strategy.
2278+ var programFacetCache = struct {
2279+ sync.RWMutex
2280+ // Key: "namespace-predicate-entity", Value: comma-separated programs
2281+ data map [string ]string
2282+ }{data : make (map [string ]string )}
2283+
2284+ func programFacetCacheKey (ns uint64 , attr string , entity uint64 ) string {
2285+ return fmt .Sprintf ("%d-%s-%d" , ns , attr , entity )
2286+ }
2287+
2288+ // updateProgramFacetCache updates the cache with program facets from committed mutations.
2289+ func updateProgramFacetCache (ns uint64 , edges []* pb.DirectedEdge ) {
2290+ programFacetCache .Lock ()
2291+ defer programFacetCache .Unlock ()
2292+
2293+ for _ , edge := range edges {
2294+ if edge .Entity == 0 {
2295+ continue
2296+ }
2297+ for _ , f := range edge .Facets {
2298+ if f .Key == x .ProgramFacetKey {
2299+ key := programFacetCacheKey (ns , edge .Attr , edge .Entity )
2300+ programFacetCache .data [key ] = string (f .Value )
2301+ glog .V (1 ).Infof ("updateProgramFacetCache: cached programs=%s for key=%s" , f .Value , key )
2302+ break
2303+ }
2304+ }
2305+ }
2306+ }
2307+
2308+ // checkMutationProgramAuth verifies that the user is authorized to modify existing data
2309+ // protected by program facets. This check happens before Raft proposal.
2310+ func checkMutationProgramAuth (ctx context.Context , edges []* pb.DirectedEdge , ns uint64 ) error {
2311+ authCtx := auth .ExtractOrNil (ctx )
2312+ if authCtx == nil || authCtx .IsNil {
2313+ // No auth context means no program restrictions - allow mutation
2314+ glog .V (1 ).Infof ("checkMutationProgramAuth: no auth context, allowing mutation" )
2315+ return nil
2316+ }
2317+
2318+ glog .V (1 ).Infof ("checkMutationProgramAuth: auth context has programs=%v" , authCtx .Programs )
2319+
2320+ // Build a set of user's programs for fast lookup
2321+ userPrograms := make (map [string ]bool )
2322+ for _ , p := range authCtx .Programs {
2323+ userPrograms [p ] = true
2324+ }
2325+
2326+ for _ , edge := range edges {
2327+ // Only check existing entities (non-zero UID)
2328+ if edge .Entity == 0 {
2329+ continue
2330+ }
2331+
2332+ // First check the in-memory cache for recently committed program facets
2333+ cacheKey := programFacetCacheKey (ns , edge .Attr , edge .Entity )
2334+ programFacetCache .RLock ()
2335+ cachedPrograms , found := programFacetCache .data [cacheKey ]
2336+ programFacetCache .RUnlock ()
2337+
2338+ if found {
2339+ glog .V (1 ).Infof ("checkMutationProgramAuth: found cached programs=%s for Entity=%d, Attr=%s" ,
2340+ cachedPrograms , edge .Entity , edge .Attr )
2341+ programs := strings .Split (cachedPrograms , "," )
2342+ hasMatch := false
2343+ for _ , prog := range programs {
2344+ if userPrograms [prog ] {
2345+ hasMatch = true
2346+ break
2347+ }
2348+ }
2349+ if ! hasMatch && len (programs ) > 0 && programs [0 ] != "" {
2350+ glog .V (1 ).Infof ("checkMutationProgramAuth: DENYING - no program match (from cache)" )
2351+ return errors .Errorf ("not authorized to modify program-protected data" )
2352+ }
2353+ continue // Authorized via cache, move to next edge
2354+ }
2355+
2356+ // Fall back to reading from posting store for older data
2357+ namespacedAttr := x .NamespaceAttr (ns , edge .Attr )
2358+ key := x .DataKey (namespacedAttr , edge .Entity )
2359+ glog .V (1 ).Infof ("checkMutationProgramAuth: checking store for Entity=%d, Attr=%s" , edge .Entity , edge .Attr )
2360+
2361+ pl , err := posting .GetNoStoreSafe (key , math .MaxUint64 )
2362+ if err != nil || pl == nil {
2363+ continue
2364+ }
2365+
2366+ // Check each existing posting for program authorization
2367+ var authErr error
2368+ pl .Iterate (math .MaxUint64 , 0 , func (p * pb.Posting ) error {
2369+ for _ , f := range p .Facets {
2370+ if f .Key == x .ProgramFacetKey {
2371+ programs := strings .Split (string (f .Value ), "," )
2372+ glog .V (1 ).Infof ("checkMutationProgramAuth: store has programs=%v, user has=%v" , programs , authCtx .Programs )
2373+ hasMatch := false
2374+ for _ , prog := range programs {
2375+ if userPrograms [prog ] {
2376+ hasMatch = true
2377+ break
2378+ }
2379+ }
2380+ if ! hasMatch && len (programs ) > 0 && programs [0 ] != "" {
2381+ glog .V (1 ).Infof ("checkMutationProgramAuth: DENYING - no program match (from store)" )
2382+ authErr = errors .Errorf ("not authorized to modify program-protected data" )
2383+ return posting .ErrStopIteration
2384+ }
2385+ break
2386+ }
2387+ }
2388+ return nil
2389+ })
2390+
2391+ if authErr != nil {
2392+ return authErr
2393+ }
2394+ }
2395+
2396+ glog .V (1 ).Infof ("checkMutationProgramAuth: allowing mutation" )
2397+ return nil
2398+ }
2399+
22482400// injectProgramFacets adds the dgraph.programs facet to edges based on the auth context.
22492401// If the user has programs in their auth context, those programs are attached to each edge
22502402// being mutated. This enables server-side filtering during queries.
0 commit comments