Implement topic resolver and optimize cache loading#1908
Conversation
…te topic resolver correctly
There was a problem hiding this comment.
Code Review
This pull request introduces a new topic resolver for explicit topic tree navigation and enhances the indicator resolver to support recursive topic expansion. Key changes include the implementation of a TopicExpander interface, a read-through cache for Statistical Variable metadata in TopicCacheManager, and parallelized node fetching in the datasource layer to improve performance and avoid database limits. Feedback focuses on optimizing the cache implementation to prevent stampedes, improving recursive slice allocations, and ensuring robust error handling and logging across the new topic expansion logic.
clincoln8
left a comment
There was a problem hiding this comment.
Thanks Keyur! just a few nits, feel free to ignore
| ) | ||
|
|
||
| // StatVarInfo stores property metadata for a Statistical Variable. | ||
| type StatVarInfo struct { |
There was a problem hiding this comment.
do we want description in this?
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
There was a problem hiding this comment.
can you add a file level comment for this one to explain what this adapter is for. if this is intended to only ever be an adapter for the topic fetch, then consider renaming the file to be more specific.
| @@ -46,7 +46,7 @@ | |||
| ) (*pbv2.ResolveResponse, error) { | |||
| // TODO: Remove this once embeddings search (resolver == "indicator") are | |||
There was a problem hiding this comment.
maybe update the TODO to also mention topic resolution being supported in spanner
| ExpandRoots(ctx context.Context, expandTopics bool) ([]*pbv2.ResolveResponse_Entity_Candidate, error) | ||
| ExpandTopic(ctx context.Context, topicDcid string, expandTopics bool) ([]*pbv2.ResolveResponse_Entity_Candidate, error) | ||
| GetTopicDisplayName(ctx context.Context, topicDcid string) string | ||
| GetSVPropertyInfos(ctx context.Context, svDcids []string) (map[string]SVPropertyInfo, error) |
There was a problem hiding this comment.
nit: thoughts on plural vs singular?
| GetSVPropertyInfos(ctx context.Context, svDcids []string) (map[string]SVPropertyInfo, error) | |
| GetSVPropertyInfo(ctx context.Context, svDcids []string) (map[string]SVPropertyInfo, error) |
| Node: node, | ||
| } | ||
|
|
||
| candidates, err := topicExpander.ExpandTopic(ctx, node, expandTopics) |
There was a problem hiding this comment.
just to confirm, if one topic fails, then we return error for entire response and no partial response?
|
|
||
| candidates, err := topicExpander.ExpandTopic(ctx, node, expandTopics) | ||
| if err != nil { | ||
| return nil, status.Errorf(codes.Internal, "Failed to expand topic %s: %v", node, err) |
There was a problem hiding this comment.
should we log here? or higher up in handler_v2 on error?
| } | ||
|
|
||
| chunks := chunkNodes(nodes, fetchAllChunkSize) | ||
| responses, err := fetchChunksParallel(ctx, ds, req, chunks, pageSize) |
There was a problem hiding this comment.
should we add some sort of debug log for how often we're chunking requests?
| return n.GetName() | ||
| } | ||
| } | ||
| return "" |
There was a problem hiding this comment.
nit: log warning for a missing topic name?
| localResp, remoteResp := <-localRespChan, <-remoteRespChan | ||
|
|
||
| // Note: merger.MergeResolve handles nil inputs (e.g. error handling or empty) gracefully | ||
| v2Resp := merger.MergeResolve(localResp, remoteResp) |
There was a problem hiding this comment.
did you double check that internal/merger/merger.go:MergeResolve correctly merges the resolver=topic case?
particularly recursive merge on children
resolver = "topic"option inside the/v2/resolveAPI and a parameter to request expanding topic hierarchies.Verification / Testing Reference
1. Topic Resolver (expand_topics = false)
Returns immediate sub-topic and variable candidates:
2. Topic Resolver (expand_topics = true)
Recursively expands the entire nested hierarchy cache:
3. Embeddings/Indicator Resolver (expand_topics = false)
Performs semantic embeddings search and returns immediate candidates:
4. Embeddings/Indicator Resolver (expand_topics = true)
Performs semantic search and recursively expands matched topic hierarchies: