Last active
January 14, 2026 00:25
-
-
Save sean-/869e8eea18447822e02aaf4538072369 to your computer and use it in GitHub Desktop.
opensearch-go fixes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/guides/bulk.md b/guides/bulk.md | |
| index 0d31298..ce9219a 100644 | |
| --- a/guides/bulk.md | |
| +++ b/guides/bulk.md | |
| @@ -15,8 +15,11 @@ import ( | |
| "fmt" | |
| "os" | |
| "strings" | |
| + "time" | |
| + "github.com/opensearch-project/opensearch-go/v4" | |
| "github.com/opensearch-project/opensearch-go/v4/opensearchapi" | |
| + "github.com/opensearch-project/opensearch-go/v4/opensearchtransport" | |
| ) | |
| func main() { | |
| @@ -27,6 +30,7 @@ func main() { | |
| } | |
| func example() error { | |
| + // Basic client setup | |
| client, err := opensearchapi.NewDefaultClient() | |
| if err != nil { | |
| return err | |
| @@ -35,6 +39,37 @@ func example() error { | |
| ctx := context.Background() | |
| ``` | |
| +### Advanced Setup: Optimized for Bulk Operations | |
| + | |
| +For high-throughput bulk operations, you can configure the client to automatically route requests to appropriate nodes: | |
| + | |
| +```go | |
| + // Advanced client setup with smart routing for mixed workloads | |
| + advancedClient, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + | |
| + // Enable node discovery to find all cluster nodes | |
| + DiscoverNodesOnStart: true, | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| + | |
| + // Configure smart routing: bulk operations go to ingest nodes, searches go to data nodes | |
| + Transport: &opensearchtransport.Config{ | |
| + Selector: opensearchtransport.NewSmartSelector( | |
| + opensearchtransport.NewRoundRobinSelector(), | |
| + ), | |
| + }, | |
| + }) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + | |
| + // This client will automatically route operations to appropriate nodes: | |
| + // - Bulk operations → ingest nodes | |
| + // - Search operations → data nodes | |
| + // - Other operations → round-robin | |
| + _ = advancedClient | |
| +``` | |
| + | |
| Next, create an index named `movies` and another named `books` with the default settings: | |
| ```go | |
| @@ -217,6 +252,86 @@ The following code shows an example on how to look for errors in the response: | |
| } | |
| ``` | |
| +## Performance Optimization for Bulk Operations | |
| + | |
| +### Automatic Ingest Node Routing | |
| + | |
| +For production environments with dedicated ingest nodes, you can optimize bulk operation performance by routing requests to the most appropriate nodes: | |
| + | |
| +```go | |
| + // Create a client optimized for bulk operations | |
| + bulkClient, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + | |
| + // Enable node discovery | |
| + DiscoverNodesOnStart: true, | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| + | |
| + // Use smart selector for automatic operation routing | |
| + Transport: &opensearchtransport.Config{ | |
| + Selector: opensearchtransport.NewSmartSelector( | |
| + opensearchtransport.NewRoundRobinSelector(), | |
| + ), | |
| + }, | |
| + }) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + | |
| + // This bulk request will automatically route to ingest nodes | |
| + bulkResp, err := bulkClient.Bulk( | |
| + ctx, | |
| + opensearchapi.BulkReq{ | |
| + Body: strings.NewReader(`{ "index": { "_index": "movies", "_id": "perf-1" } } | |
| +{ "title": "High Performance Bulk", "year": 2024 } | |
| +{ "index": { "_index": "movies", "_id": "perf-2" } } | |
| +{ "title": "Optimized Ingest", "year": 2024 } | |
| +`), | |
| + }, | |
| + ) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + fmt.Printf("Optimized bulk completed with %d items\n", len(bulkResp.Items)) | |
| +``` | |
| + | |
| +### Choosing the Right Selector | |
| + | |
| +You can choose different routing strategies based on your cluster setup: | |
| + | |
| +```go | |
| + // Create a fallback selector (round-robin) | |
| + fallbackSelector := opensearchtransport.NewRoundRobinSelector() | |
| + | |
| + // Option 1: Prefer ingest nodes, fallback to any available node | |
| + ingestPreferred := opensearchtransport.NewRoleBasedSelector( | |
| + opensearchtransport.WithRequiredRoles(opensearchtransport.RoleIngest), | |
| + opensearchtransport.WithFallback(fallbackSelector), | |
| + ) | |
| + | |
| + // Option 2: Only use ingest nodes, fail if none available (strict mode) | |
| + ingestOnly := opensearchtransport.NewRoleBasedSelector( | |
| + opensearchtransport.WithRequiredRoles(opensearchtransport.RoleIngest), | |
| + opensearchtransport.WithStrictMode(), | |
| + ) | |
| + | |
| + // Option 3: Automatically detect operation type and route appropriately | |
| + smartSelector := opensearchtransport.NewSmartSelector(fallbackSelector) | |
| + | |
| + // Option 4: Use the generic selector for custom role combinations | |
| + customSelector := opensearchtransport.NewRoleBasedSelector( | |
| + opensearchtransport.WithRequiredRoles(opensearchtransport.RoleIngest), | |
| + opensearchtransport.WithExcludedRoles(opensearchtransport.RoleClusterManager), | |
| + opensearchtransport.WithFallback(fallbackSelector), | |
| + ) | |
| +``` | |
| + | |
| +The smart selector automatically detects different operation types: | |
| +- **Bulk operations** (`/_bulk`) → Routes to ingest nodes | |
| +- **Ingest pipeline operations** (`/_ingest/`) → Routes to ingest nodes | |
| +- **Search operations** (`/_search`) → Routes to data nodes | |
| +- **Other operations** → Uses default routing | |
| + | |
| ## Cleanup | |
| To clean up the resources created in this guide, delete the `movies` and `books` indices: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/guides/node_discovery_and_roles.md b/guides/node_discovery_and_roles.md | |
| new file mode 100644 | |
| index 0000000..995420c | |
| --- /dev/null | |
| +++ b/guides/node_discovery_and_roles.md | |
| @@ -0,0 +1,387 @@ | |
| +# Node Discovery and Role Management | |
| + | |
| +This guide covers OpenSearch node discovery functionality and role-based node selection in the Go client. | |
| + | |
| +## Overview | |
| + | |
| +The OpenSearch Go client can automatically discover nodes in your cluster and intelligently select appropriate nodes for routing requests based on their roles. This feature helps ensure optimal performance and follows OpenSearch best practices. | |
| + | |
| +## Basic Node Discovery | |
| + | |
| +### Enabling Node Discovery | |
| + | |
| +```go | |
| +client, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + // Discover nodes when the client starts | |
| + DiscoverNodesOnStart: true, | |
| + // Periodically refresh the node list every 5 minutes | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| +}) | |
| +``` | |
| + | |
| +#### Discovery Modes | |
| + | |
| +**Periodic discovery (`DiscoverNodesInterval > 0`)**: | |
| +- **Default: `0` (disabled)** - periodic discovery must be explicitly configured | |
| +- Runs discovery asynchronously in background, jittered by 10% to prevent thundering herd | |
| + | |
| +**Synchronous startup discovery (`DiscoverNodesOnStart: true`)**: | |
| +- **Default: `false` (disabled)** - startup discovery must be explicitly enabled | |
| +- Prevents cold-start race conditions where requests fail before initial discovery completes | |
| +- Returns error if discovery times out (configurable via `DiscoveryTimeout`) | |
| + | |
| +#### Discovery Timeout Configuration | |
| + | |
| +Control how long discovery operations wait before timing out: | |
| + | |
| +```go | |
| +client, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + DiscoverNodesOnStart: true, | |
| + | |
| + // Timeout configuration for discovery operations | |
| + DiscoveryTimeout: 45 * time.Second, // Default: 30s if not specified | |
| + // DiscoveryTimeout: -1, // No timeout (blocks until completion) | |
| +}) | |
| + | |
| +### Manual Node Discovery | |
| + | |
| +You can also trigger node discovery manually: | |
| + | |
| +```go | |
| +err := client.DiscoverNodes() | |
| +if err != nil { | |
| + log.Printf("Error discovering nodes: %s", err) | |
| +} | |
| +``` | |
| + | |
| +## Node Roles | |
| + | |
| +OpenSearch nodes can have various roles that determine their capabilities. The Go client provides constants for these roles: | |
| + | |
| +```go | |
| +import "github.com/opensearch-project/opensearch-go/v4/opensearchtransport" | |
| + | |
| +// Available role constants | |
| +opensearchtransport.RoleData // Data nodes store documents and handle search | |
| +opensearchtransport.RoleIngest // Ingest nodes process documents before indexing | |
| +opensearchtransport.RoleClusterManager // Cluster manager nodes manage cluster state | |
| +opensearchtransport.RoleMaster // Deprecated: use RoleClusterManager instead | |
| +opensearchtransport.RoleSearch // Deprecated in 3.0+: use RoleWarm for searchable snapshots | |
| +opensearchtransport.RoleWarm // Warm nodes for searchable snapshots (OpenSearch 3.0+) | |
| +opensearchtransport.RoleRemoteClusterClient // Enables cross-cluster connections | |
| +``` | |
| + | |
| +### OpenSearch 3.0 Role Changes | |
| + | |
| +**Important**: As of OpenSearch 3.0, there have been significant changes to node roles: | |
| + | |
| +- **Searchable Snapshots**: Nodes that use the searchable snapshots feature must have the `warm` node role instead of the `search` role. | |
| +- **Role Migration**: The `search` role is deprecated for searchable snapshots functionality in OpenSearch 3.0+. | |
| + | |
| +## Role-Based Node Selection | |
| + | |
| +### Dedicated Cluster Manager Nodes | |
| + | |
| +**By default, the client excludes dedicated cluster manager nodes** from request routing (following Java client best practices). You can change this behavior by explicitly setting `IncludeDedicatedClusterManagers: true`: | |
| + | |
| +```go | |
| +client, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + | |
| + // Enable node discovery | |
| + DiscoverNodesOnStart: true, | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| + | |
| + // Configure transport with cluster master exclusion | |
| + Transport: &opensearchtransport.Config{ | |
| + // Default behavior (can be omitted): excludes dedicated cluster managers | |
| + // IncludeDedicatedClusterManagers: false, | |
| + | |
| + // To include dedicated cluster managers, set to true: | |
| + // IncludeDedicatedClusterManagers: true, | |
| + }, | |
| +}) | |
| +``` | |
| + | |
| +When `IncludeDedicatedClusterManagers` is disabled (default), these nodes will be EXCLUDED from request routing: | |
| +- Nodes with only "cluster_manager" role | |
| +- Nodes with only "master" role (deprecated) | |
| +- Nodes with "cluster_manager" + "remote_cluster_client" roles only | |
| + | |
| +These nodes will be INCLUDED (even with IncludeDedicatedClusterManagers disabled): | |
| +- Cluster manager + data nodes | |
| +- Cluster manager + ingest nodes | |
| +- Cluster manager + warm nodes (OpenSearch 3.0+ for searchable snapshots) | |
| +- Pure data nodes | |
| +- Pure ingest nodes | |
| +- Pure warm nodes | |
| +- Search nodes (backward compatibility) | |
| +- Pure remote_cluster_client nodes (coordinating nodes with cross-cluster capability) | |
| + | |
| +### Remote Cluster Client Role | |
| + | |
| +The `remote_cluster_client` role is a **capability role** that enables cross-cluster connections but does not affect node selection for request routing. Nodes with this role can make outbound connections to remote clusters for cross-cluster search and replication operations. | |
| + | |
| +```go | |
| +// Valid: Pure remote cluster client (treated as coordinating node) | |
| +roles := []string{opensearchtransport.RoleRemoteClusterClient} // INCLUDED - effectively coordinating-only | |
| + | |
| +// Valid: Combined with other roles | |
| +roles := []string{opensearchtransport.RoleData, opensearchtransport.RoleRemoteClusterClient} // INCLUDED - data node with capability | |
| + | |
| +// Filtered: Cluster manager + remote cluster client only (follows cluster manager filtering) | |
| +roles := []string{opensearchtransport.RoleClusterManager, opensearchtransport.RoleRemoteClusterClient} // EXCLUDED - dedicated cluster manager | |
| +``` | |
| + | |
| +Since `remote_cluster_client` is a capability role, it is ignored during node filtering. This matches OpenSearch server behavior where the role enables outbound connections but doesn't determine inbound request eligibility. | |
| + | |
| +### Warm Nodes for Searchable Snapshots (OpenSearch 3.0+) | |
| + | |
| +Warm nodes are now the preferred method for searchable snapshots: | |
| + | |
| +```go | |
| +// Recommended for OpenSearch 3.0+: Warm node for searchable snapshots | |
| +roles := []string{opensearchtransport.RoleWarm} | |
| + | |
| +// Also valid: Warm node combined with data | |
| +roles := []string{opensearchtransport.RoleWarm, opensearchtransport.RoleData} | |
| + | |
| +// DEPRECATED in OpenSearch 3.0+: Search role for searchable snapshots | |
| +roles := []string{opensearchtransport.RoleSearch} // Will log deprecation warning | |
| +``` | |
| + | |
| +### Search Node Constraints | |
| + | |
| +Search nodes have special constraints for backward compatibility: | |
| + | |
| +```go | |
| +// Valid: Search-only node (backward compatibility) | |
| +roles := []string{opensearchtransport.RoleSearch} | |
| + | |
| +// INVALID: Search nodes cannot have other roles | |
| +roles := []string{opensearchtransport.RoleSearch, opensearchtransport.RoleData} // Will cause validation error | |
| + | |
| +// INVALID: Cannot mix warm and search roles (OpenSearch 3.0+) | |
| +roles := []string{opensearchtransport.RoleWarm, opensearchtransport.RoleSearch} // Will cause validation error | |
| +``` | |
| + | |
| +## Role Validation | |
| + | |
| +The client validates node roles for compatibility and logs warnings for deprecated configurations: | |
| + | |
| +### Deprecated Roles | |
| + | |
| +```go | |
| +// Master role deprecation warning: | |
| +// "DEPRECATION WARNING: Node [node-1] uses deprecated 'master' role. | |
| +// Please use 'cluster_manager' role instead to promote inclusive language" | |
| + | |
| +// Search role deprecation warning (OpenSearch 3.0+): | |
| +// "DEPRECATION WARNING: Node [node-1] uses 'search' role. As of OpenSearch 3.0, | |
| +// searchable snapshots functionality requires 'warm' role instead. | |
| +// Consider migrating to 'warm' role for future compatibility" | |
| +``` | |
| + | |
| +### Conflicting Roles | |
| + | |
| +```go | |
| +// This configuration will cause an error: | |
| +// "node [node-1] has conflicting roles ["master", "cluster_manager"] - | |
| +// these cannot be assigned together" | |
| + | |
| +// This configuration will cause an error (OpenSearch 3.0+): | |
| +// "node [node-1] cannot have both "warm" and "search" roles - | |
| +// use "warm" for searchable snapshots in OpenSearch 3.0+" | |
| +``` | |
| + | |
| +## OpenSearch 3.X Compatibility | |
| + | |
| +The enhanced role validation ensures compatibility with OpenSearch 3.X best practices: | |
| + | |
| +- **Prevents anti-patterns**: Master + data role combinations that can impact cluster stability | |
| +- **Enforces role separation**: Encourages dedicated cluster manager nodes | |
| +- **Searchable snapshots migration**: Guides users from deprecated `search` role to `warm` role | |
| +- **Future compatibility**: Graceful handling of deprecated roles with warnings | |
| + | |
| +## Migration Guide | |
| + | |
| +### From Master to Cluster Manager | |
| + | |
| +If you're using deprecated "master" roles, migrate to "cluster_manager": | |
| + | |
| +```yaml | |
| +# Old configuration (deprecated) | |
| +node.roles: ["master", "data"] | |
| + | |
| +# New configuration (recommended) | |
| +node.roles: ["cluster_manager", "data"] | |
| +``` | |
| + | |
| +### From Search to Warm (OpenSearch 3.0+) | |
| + | |
| +For searchable snapshots, migrate from "search" to "warm": | |
| + | |
| +```yaml | |
| +# Old configuration (deprecated in OpenSearch 3.0+) | |
| +node.roles: ["search"] | |
| +node.search.cache.size: 50gb | |
| + | |
| +# New configuration (OpenSearch 3.0+) | |
| +node.roles: ["warm"] | |
| +node.search.cache.size: 50gb | |
| +``` | |
| + | |
| +### Role Separation Best Practices | |
| + | |
| +For production clusters, consider separating roles: | |
| + | |
| +```yaml | |
| +# Dedicated cluster manager nodes | |
| +node.roles: ["cluster_manager"] | |
| + | |
| +# Dedicated data nodes | |
| +node.roles: ["data", "ingest"] | |
| + | |
| +# Dedicated warm nodes for searchable snapshots (OpenSearch 3.0+) | |
| +node.roles: ["warm"] | |
| +node.search.cache.size: 50gb | |
| +``` | |
| + | |
| +## Performance Optimizations | |
| + | |
| +The client includes several performance improvements for handling node roles: | |
| + | |
| +- **Fast role lookups**: Role checking is optimized for speed | |
| +- **Efficient validation**: Role compatibility is checked only when nodes are discovered | |
| +- **Minimal overhead**: Role information is processed once per node during discovery | |
| + | |
| +### Connection Preservation Across Discovery Events | |
| + | |
| +**New in v4.7.0**: The client now preserves existing connection states during periodic node discovery, significantly improving performance and stability: | |
| + | |
| +#### How Connection Preservation Works | |
| + | |
| +When node discovery runs (typically every few minutes), the client: | |
| + | |
| +1. **Preserves existing connections**: Live connections to nodes still in the cluster maintain their state | |
| +2. **Maintains resurrection timers**: Dead connections with active resurrection timers are preserved | |
| +3. **Preserves failure counts**: Connection health metrics persist across discovery events | |
| +4. **Only replaces missing nodes**: Connections are only discarded for nodes no longer in the cluster | |
| + | |
| +#### Performance Benefits | |
| + | |
| +```go | |
| +// Before: Discovery would create entirely new connection pools | |
| +// After: Discovery preserves connection states intelligently | |
| + | |
| +client, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://node1:9200", "http://node2:9200"}, | |
| + | |
| + // Periodic discovery now preserves connection states | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| +}) | |
| + | |
| +// Existing connections to node1 and node2 will be preserved | |
| +// Dead connections maintain their resurrection timers | |
| +// New nodes (node3) get added as fresh live connections | |
| +``` | |
| + | |
| +#### Thread Safety and Latency Impact | |
| + | |
| +- **Startup discovery**: When `DiscoverNodesOnStart: true`, discovery blocks client creation (synchronous) | |
| +- **Periodic discovery**: Runs in background goroutines and does NOT block request paths (asynchronous) | |
| +- **Minimal lock contention**: Connection pool updates use brief, atomic operations | |
| +- **Preserved timers**: Resurrection logic continues uninterrupted across discovery events | |
| +- **Reduced churn**: Connection preservation eliminates unnecessary connection recreation | |
| + | |
| +#### Observing Connection Preservation | |
| + | |
| +You can observe connection preservation in debug logs: | |
| + | |
| +```go | |
| +import "github.com/opensearch-project/opensearch-go/v4/opensearchtransport" | |
| + | |
| +// Enable debug logging to see connection preservation | |
| +client, err := opensearch.NewClient(opensearch.Config{ | |
| + Transport: &opensearchtransport.Config{ | |
| + EnableDebugLogger: true, | |
| + }, | |
| +}) | |
| + | |
| +// Logs will show: | |
| +// "Discovered node "node1"; http://node1:9200; roles=[data]" | |
| +// "Discovered node "node2"; http://node2:9200; roles=[ingest]" | |
| +// Connection preservation happens transparently | |
| +``` | |
| + | |
| +## Troubleshooting | |
| + | |
| +### Common Issues | |
| + | |
| +1. **All nodes excluded from routing** | |
| + - Check that you have non-dedicated cluster manager nodes | |
| + - Ensure data/ingest nodes are available | |
| + | |
| +2. **Role validation errors** | |
| + - Remove conflicting master+cluster_manager combinations | |
| + - Ensure search nodes don't have other roles | |
| + | |
| +3. **Discovery failures** | |
| + - Verify network connectivity to cluster nodes | |
| + - Check authentication credentials | |
| + - Review cluster node configuration | |
| + - Increase `DiscoveryTimeout` for slow networks | |
| + - Set `DiscoveryTimeout: -1` to disable timeout for debugging | |
| + | |
| +### Debug Logging | |
| + | |
| +Enable debug logging to see node discovery details: | |
| + | |
| +```go | |
| +import "github.com/opensearch-project/opensearch-go/v4/opensearchtransport" | |
| + | |
| +// The client will log discovered nodes and role validation results | |
| +// when debug logging is enabled in your application | |
| +``` | |
| + | |
| +## Example: Complete Setup | |
| + | |
| +```go | |
| +package main | |
| + | |
| +import ( | |
| + "log" | |
| + "time" | |
| + | |
| + "github.com/opensearch-project/opensearch-go/v4" | |
| + "github.com/opensearch-project/opensearch-go/v4/opensearchtransport" | |
| +) | |
| + | |
| +func main() { | |
| + client, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + | |
| + // Enable automatic node discovery | |
| + DiscoverNodesOnStart: true, | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| + DiscoveryTimeout: 45 * time.Second, // Custom timeout for discovery operations | |
| + | |
| + // Optional: Custom transport for additional configuration | |
| + Transport: &opensearchtransport.Config{ | |
| + // Additional transport settings... | |
| + }, | |
| + }) | |
| + if err != nil { | |
| + log.Fatalf("Error creating client: %s", err) | |
| + } | |
| + | |
| + // Client will automatically: | |
| + // 1. Discover cluster nodes on startup | |
| + // 2. Validate node roles for compatibility | |
| + // 3. Route requests only to appropriate nodes | |
| + // 4. Refresh node list periodically | |
| + // 5. Log deprecation warnings for old role names | |
| +} | |
| +``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/guides/search.md b/guides/search.md | |
| index a5ad2e9..16f1458 100644 | |
| --- a/guides/search.md | |
| +++ b/guides/search.md | |
| @@ -11,12 +11,16 @@ package main | |
| import ( | |
| "context" | |
| + "encoding/json" | |
| "fmt" | |
| "os" | |
| "strconv" | |
| "strings" | |
| + "time" | |
| + "github.com/opensearch-project/opensearch-go/v4" | |
| "github.com/opensearch-project/opensearch-go/v4/opensearchapi" | |
| + "github.com/opensearch-project/opensearch-go/v4/opensearchtransport" | |
| ) | |
| func main() { | |
| @@ -27,6 +31,7 @@ func main() { | |
| } | |
| func example() error { | |
| + // Basic client setup | |
| client, err := opensearchapi.NewDefaultClient() | |
| if err != nil { | |
| return err | |
| @@ -34,6 +39,34 @@ func example() error { | |
| ctx := context.Background() | |
| exampleIndex := "movies" | |
| +``` | |
| + | |
| +### Advanced Setup: Search-Optimized Client | |
| + | |
| +For search-heavy applications, you can configure the client to automatically route search requests to nodes optimized for data retrieval: | |
| + | |
| +```go | |
| + // Advanced client setup optimized for search operations | |
| + searchClient, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + | |
| + // Enable node discovery to find all data nodes | |
| + DiscoverNodesOnStart: true, | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| + | |
| + // Configure automatic routing to data nodes for search operations | |
| + Transport: &opensearchtransport.Config{ | |
| + Selector: opensearchtransport.NewSmartSelector( | |
| + opensearchtransport.NewRoundRobinSelector(), | |
| + ), | |
| + }, | |
| + }) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + | |
| + // Use search-optimized client for better performance | |
| + _ = searchClient // This client will automatically route searches to data nodes | |
| createResp, err := client.Indices.Create(ctx, opensearchapi.IndicesCreateReq{Index: exampleIndex}) | |
| if err != nil { | |
| @@ -239,6 +272,98 @@ The scroll example above has one weakness: if the index is updated while you are | |
| Note that a point-in-time is associated with an index or a set of index. So, when performing a search with a point-in-time, you DO NOT specify the index in the search. | |
| +## Search Performance Optimization | |
| + | |
| +### Automatic Data Node Routing | |
| + | |
| +For production search workloads, you can optimize performance by ensuring search requests are routed to nodes best suited for data retrieval: | |
| + | |
| +```go | |
| + // Create a search-optimized client | |
| + optimizedSearchClient, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + | |
| + // Enable node discovery | |
| + DiscoverNodesOnStart: true, | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| + | |
| + // Use data-preferred selector for search optimization | |
| + Transport: &opensearchtransport.Config{ | |
| + Selector: opensearchtransport.NewRoleBasedSelector( | |
| + opensearchtransport.WithRequiredRoles(opensearchtransport.RoleData), | |
| + ), | |
| + }, | |
| + }) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + | |
| + // Search requests will automatically route to data nodes | |
| + searchResp, err := optimizedSearchClient.Search( | |
| + ctx, | |
| + &opensearchapi.SearchReq{ | |
| + Indices: []string{exampleIndex}, | |
| + Params: opensearchapi.SearchParams{ | |
| + Query: `title: "dark knight"`, | |
| + Size: opensearchapi.ToPointer(10), | |
| + }, | |
| + }, | |
| + ) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + fmt.Printf("Optimized search found %d documents\n", searchResp.Hits.Total.Value) | |
| +``` | |
| + | |
| +### Smart Routing for Mixed Workloads | |
| + | |
| +The smart selector automatically detects operation types and routes them to the most appropriate nodes: | |
| + | |
| +```go | |
| + // Smart routing: automatically detects search vs ingest operations | |
| + smartClient, err := opensearch.NewClient(opensearch.Config{ | |
| + Addresses: []string{"http://localhost:9200"}, | |
| + | |
| + DiscoverNodesOnStart: true, | |
| + DiscoverNodesInterval: 5 * time.Minute, | |
| + | |
| + Transport: &opensearchtransport.Config{ | |
| + Selector: opensearchtransport.NewSmartSelector( | |
| + opensearchtransport.NewRoundRobinSelector(), | |
| + ), | |
| + }, | |
| + }) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + | |
| + // Search operations automatically route to data nodes | |
| + _, err = smartClient.Search(ctx, &opensearchapi.SearchReq{ | |
| + Indices: []string{exampleIndex}, | |
| + }) | |
| + if err != nil { | |
| + return err | |
| + } | |
| + | |
| + // Multi-search operations also route to data nodes | |
| + _, err = smartClient.MSearch(ctx, opensearchapi.MSearchReq{ | |
| + Body: strings.NewReader(`{} | |
| +{"query": {"match_all": {}}} | |
| +`), | |
| + }) | |
| + if err != nil { | |
| + return err | |
| + } | |
| +``` | |
| + | |
| +### Routing Strategy Overview | |
| + | |
| +The smart selector provides automatic routing based on the operation being performed: | |
| +- **Search operations** (`/_search`, `/_msearch`, document retrieval) → Data nodes | |
| +- **Bulk operations** (`/_bulk`) → Ingest nodes | |
| +- **Ingest operations** (`/_ingest/`) → Ingest nodes | |
| +- **Other operations** → Default round-robin routing | |
| + | |
| ## Source API | |
| The source API returns the source of the documents with included or excluded fields. The following example returns all fields from document source in the `movies` index: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearch.go b/opensearch.go | |
| index b04e7c7..b3e830f 100644 | |
| --- a/opensearch.go | |
| +++ b/opensearch.go | |
| @@ -88,8 +88,8 @@ type Config struct { | |
| CompressRequestBody bool // Default: false. | |
| - DiscoverNodesOnStart bool // Discover nodes when initializing the client. Default: false. | |
| - DiscoverNodesInterval time.Duration // Discover nodes periodically. Default: disabled. | |
| + DiscoverNodesOnStart bool // Discover nodes synchronously when initializing the client (blocks until complete). Default: false. | |
| + DiscoverNodesInterval time.Duration // Discover nodes periodically in background. Default: disabled. | |
| EnableMetrics bool // Enable the metrics collection. | |
| EnableDebugLogger bool // Enable the debug logging. | |
| @@ -178,6 +178,7 @@ func NewClient(cfg Config) (*Client, error) { | |
| EnableMetrics: cfg.EnableMetrics, | |
| EnableDebugLogger: cfg.EnableDebugLogger, | |
| + DiscoverNodesOnStart: cfg.DiscoverNodesOnStart, | |
| DiscoverNodesInterval: cfg.DiscoverNodesInterval, | |
| Transport: cfg.Transport, | |
| @@ -191,11 +192,6 @@ func NewClient(cfg Config) (*Client, error) { | |
| client := &Client{Transport: tp} | |
| - if cfg.DiscoverNodesOnStart { | |
| - //nolint:errcheck // goroutine discards return values | |
| - go client.DiscoverNodes() | |
| - } | |
| - | |
| return client, err | |
| } | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/connection.go b/opensearchtransport/connection.go | |
| index 58e16ce..8bd3930 100644 | |
| --- a/opensearchtransport/connection.go | |
| +++ b/opensearchtransport/connection.go | |
| @@ -31,8 +31,11 @@ import ( | |
| "fmt" | |
| "math" | |
| "net/url" | |
| + "slices" | |
| "sort" | |
| + "strings" | |
| "sync" | |
| + "sync/atomic" | |
| "time" | |
| ) | |
| @@ -41,11 +44,34 @@ const ( | |
| defaultResurrectTimeoutFactorCutoff = 5 | |
| ) | |
| +// Errors | |
| +var ( | |
| + ErrNoConnections = errors.New("no connections available") | |
| +) | |
| + | |
| // Selector defines the interface for selecting connections from the pool. | |
| type Selector interface { | |
| Select([]*Connection) (*Connection, error) | |
| } | |
| +// RequestAwareSelector extends the basic Selector interface to support request-aware node selection. | |
| +// This allows selectors to make routing decisions based on the request being performed. | |
| +type RequestAwareSelector interface { | |
| + Selector // Embed existing interface for backward compatibility | |
| + SelectForRequest([]*Connection, Request) (*Connection, error) | |
| +} | |
| + | |
| +// Request represents a request that can be used for routing decisions. | |
| +// This interface allows selectors to examine request properties without importing opensearchapi. | |
| +type Request interface { | |
| + GetMethod() string | |
| + GetPath() string | |
| + GetHeaders() map[string]string | |
| +} | |
| + | |
| +// NodeFilter defines a function type for filtering connections based on their properties. | |
| +type NodeFilter func(*Connection) bool | |
| + | |
| // ConnectionPool defines the interface for the connection pool. | |
| type ConnectionPool interface { | |
| Next() (*Connection, error) // Next returns the next available connection. | |
| @@ -54,19 +80,35 @@ type ConnectionPool interface { | |
| URLs() []*url.URL // URLs returns the list of URLs of available connections. | |
| } | |
| -// Connection represents a connection to a node. | |
| -type Connection struct { | |
| - sync.Mutex | |
| +// RequestAwareConnectionPool extends ConnectionPool to support request-aware connection selection. | |
| +type RequestAwareConnectionPool interface { | |
| + ConnectionPool | |
| + NextForRequest(Request) (*Connection, error) // NextForRequest returns connection optimized for the request. | |
| +} | |
| - URL *url.URL | |
| - IsDead bool | |
| - DeadSince time.Time | |
| - Failures int | |
| +// rwLocker defines the interface for connection pools that support read-write locking. | |
| +// This allows for more efficient concurrent access when only read operations are needed. | |
| +type rwLocker interface { | |
| + sync.Locker // Embeds Lock() and Unlock() methods | |
| + RLock() | |
| + RUnlock() | |
| +} | |
| +// Connection represents a connection to a node. | |
| +type Connection struct { | |
| + URL *url.URL | |
| ID string | |
| Name string | |
| Roles []string | |
| Attributes map[string]interface{} | |
| + | |
| + failures atomic.Int64 | |
| + | |
| + mu struct { | |
| + sync.RWMutex | |
| + isDead bool | |
| + deadSince time.Time | |
| + } | |
| } | |
| type singleConnectionPool struct { | |
| @@ -76,10 +118,12 @@ type singleConnectionPool struct { | |
| } | |
| type statusConnectionPool struct { | |
| - sync.Mutex | |
| + mu struct { | |
| + sync.RWMutex | |
| + live []*Connection // List of live connections | |
| + dead []*Connection // List of dead connections | |
| + } | |
| - live []*Connection // List of live connections | |
| - dead []*Connection // List of dead connections | |
| selector Selector | |
| resurrectTimeoutInitial time.Duration | |
| resurrectTimeoutFactorCutoff int | |
| @@ -87,10 +131,18 @@ type statusConnectionPool struct { | |
| metrics *metrics | |
| } | |
| +// Compile-time check to ensure statusConnectionPool implements rwLocker | |
| +var _ rwLocker = (*statusConnectionPool)(nil) | |
| + | |
| type roundRobinSelector struct { | |
| - sync.Mutex | |
| + curr atomic.Int64 // Index of the current connection | |
| +} | |
| - curr int // Index of the current connection | |
| +// NewRoundRobinSelector creates a new round-robin connection selector. | |
| +func NewRoundRobinSelector() *roundRobinSelector { | |
| + s := &roundRobinSelector{} | |
| + s.curr.Store(-1) | |
| + return s | |
| } | |
| // NewConnectionPool creates and returns a default connection pool. | |
| @@ -100,15 +152,32 @@ func NewConnectionPool(conns []*Connection, selector Selector) ConnectionPool { | |
| } | |
| if selector == nil { | |
| - selector = &roundRobinSelector{curr: -1} | |
| + selector = NewRoundRobinSelector() | |
| } | |
| - return &statusConnectionPool{ | |
| - live: conns, | |
| + return newStatusConnectionPool(conns, nil, selector) | |
| +} | |
| + | |
| +// newStatusConnectionPool creates a new statusConnectionPool with proper defaults | |
| +func newStatusConnectionPool(live, dead []*Connection, selector Selector) *statusConnectionPool { | |
| + if live == nil { | |
| + live = []*Connection{} | |
| + } | |
| + if dead == nil { | |
| + dead = []*Connection{} | |
| + } | |
| + if selector == nil { | |
| + selector = NewRoundRobinSelector() | |
| + } | |
| + | |
| + pool := &statusConnectionPool{ | |
| selector: selector, | |
| resurrectTimeoutInitial: defaultResurrectTimeoutInitial, | |
| resurrectTimeoutFactorCutoff: defaultResurrectTimeoutFactorCutoff, | |
| } | |
| + pool.mu.live = live | |
| + pool.mu.dead = dead | |
| + return pool | |
| } | |
| // Next returns the connection from pool. | |
| @@ -116,6 +185,12 @@ func (cp *singleConnectionPool) Next() (*Connection, error) { | |
| return cp.connection, nil | |
| } | |
| +// NextForRequest returns the connection from pool (request-aware version). | |
| +// For single connection pools, this behaves the same as Next(). | |
| +func (cp *singleConnectionPool) NextForRequest(req Request) (*Connection, error) { | |
| + return cp.connection, nil | |
| +} | |
| + | |
| // OnSuccess is a no-op for single connection pool. | |
| func (cp *singleConnectionPool) OnSuccess(*Connection) {} | |
| @@ -129,18 +204,44 @@ func (cp *singleConnectionPool) connections() []*Connection { return []*Connecti | |
| // Next returns a connection from pool, or an error. | |
| func (cp *statusConnectionPool) Next() (*Connection, error) { | |
| - cp.Lock() | |
| - defer cp.Unlock() | |
| + cp.mu.Lock() | |
| + defer cp.mu.Unlock() | |
| // Return next live connection | |
| - if len(cp.live) > 0 { | |
| - return cp.selector.Select(cp.live) | |
| - } else if len(cp.dead) > 0 { | |
| + if len(cp.mu.live) > 0 { | |
| + return cp.selector.Select(cp.mu.live) | |
| + } else if len(cp.mu.dead) > 0 { | |
| + // No live connection is available, resurrect one of the dead ones. | |
| + c := cp.mu.dead[len(cp.mu.dead)-1] | |
| + cp.mu.dead = cp.mu.dead[:len(cp.mu.dead)-1] | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| + cp.resurrect(c, false) | |
| + return c, nil | |
| + } | |
| + | |
| + return nil, errors.New("no connection available") | |
| +} | |
| + | |
| +// NextForRequest returns a connection from pool optimized for the request, or an error. | |
| +func (cp *statusConnectionPool) NextForRequest(req Request) (*Connection, error) { | |
| + cp.mu.Lock() | |
| + defer cp.mu.Unlock() | |
| + | |
| + // Return next live connection using request-aware selection if available | |
| + if len(cp.mu.live) > 0 { | |
| + // Try request-aware selection first | |
| + if ras, ok := cp.selector.(RequestAwareSelector); ok { | |
| + return ras.SelectForRequest(cp.mu.live, req) | |
| + } | |
| + // Fall back to basic selection | |
| + return cp.selector.Select(cp.mu.live) | |
| + } else if len(cp.mu.dead) > 0 { | |
| // No live connection is available, resurrect one of the dead ones. | |
| - c := cp.dead[len(cp.dead)-1] | |
| - cp.dead = cp.dead[:len(cp.dead)-1] | |
| - c.Lock() | |
| - defer c.Unlock() | |
| + c := cp.mu.dead[len(cp.mu.dead)-1] | |
| + cp.mu.dead = cp.mu.dead[:len(cp.mu.dead)-1] | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| cp.resurrect(c, false) | |
| return c, nil | |
| } | |
| @@ -150,33 +251,34 @@ func (cp *statusConnectionPool) Next() (*Connection, error) { | |
| // OnSuccess marks the connection as successful. | |
| func (cp *statusConnectionPool) OnSuccess(c *Connection) { | |
| - c.Lock() | |
| - defer c.Unlock() | |
| + // Establish consistent lock ordering: Pool → Connection | |
| + cp.mu.Lock() | |
| + defer cp.mu.Unlock() | |
| + | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| // Short-circuit for live connection | |
| - if !c.IsDead { | |
| + if !c.mu.isDead { | |
| return | |
| } | |
| - c.markAsHealthy() | |
| - | |
| - cp.Lock() | |
| - defer cp.Unlock() | |
| + c.markAsHealthyWithLock() | |
| cp.resurrect(c, true) | |
| } | |
| // OnFailure marks the connection as failed. | |
| func (cp *statusConnectionPool) OnFailure(c *Connection) error { | |
| - cp.Lock() | |
| - defer cp.Unlock() | |
| + cp.mu.Lock() | |
| + defer cp.mu.Unlock() | |
| - c.Lock() | |
| + c.mu.Lock() | |
| - if c.IsDead { | |
| + if c.mu.isDead { | |
| if debugLogger != nil { | |
| debugLogger.Logf("Already removed %s\n", c.URL) | |
| } | |
| - c.Unlock() | |
| + c.mu.Unlock() | |
| return nil | |
| } | |
| @@ -185,53 +287,52 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error { | |
| debugLogger.Logf("Removing %s...\n", c.URL) | |
| } | |
| - c.markAsDead() | |
| + c.markAsDeadWithLock() | |
| cp.scheduleResurrect(c) | |
| - c.Unlock() | |
| + c.mu.Unlock() | |
| // Push item to dead list and sort slice by number of failures | |
| - cp.dead = append(cp.dead, c) | |
| - sort.Slice(cp.dead, func(i, j int) bool { | |
| - c1 := cp.dead[i] | |
| - c2 := cp.dead[j] | |
| - c1.Lock() | |
| - c2.Lock() | |
| - defer c1.Unlock() | |
| - defer c2.Unlock() | |
| - | |
| - res := c1.Failures > c2.Failures | |
| - return res | |
| + cp.mu.dead = append(cp.mu.dead, c) | |
| + sort.Slice(cp.mu.dead, func(i, j int) bool { | |
| + c1 := cp.mu.dead[i] | |
| + c2 := cp.mu.dead[j] | |
| + | |
| + // Use atomic loads for failure counts - no locking needed | |
| + failures1 := c1.failures.Load() | |
| + failures2 := c2.failures.Load() | |
| + | |
| + return failures1 > failures2 | |
| }) | |
| // Check if connection exists in the list, return error if not. | |
| index := -1 | |
| - for i, conn := range cp.live { | |
| + for i, conn := range cp.mu.live { | |
| if conn == c { | |
| index = i | |
| } | |
| } | |
| if index < 0 { | |
| - // Does this error even get raised? Under what conditions can the connection not be in the cp.live list? | |
| + // Does this error even get raised? Under what conditions can the connection not be in the cp.mu.live list? | |
| // If the connection is marked dead the function already ended | |
| return errors.New("connection not in live list") | |
| } | |
| // Remove item; https://github.com/golang/go/wiki/SliceTricks | |
| - copy(cp.live[index:], cp.live[index+1:]) | |
| - cp.live = cp.live[:len(cp.live)-1] | |
| + copy(cp.mu.live[index:], cp.mu.live[index+1:]) | |
| + cp.mu.live = cp.mu.live[:len(cp.mu.live)-1] | |
| return nil | |
| } | |
| // URLs returns the list of URLs of available connections. | |
| func (cp *statusConnectionPool) URLs() []*url.URL { | |
| - cp.Lock() | |
| - defer cp.Unlock() | |
| + cp.mu.RLock() | |
| + defer cp.mu.RUnlock() | |
| - urls := make([]*url.URL, len(cp.live)) | |
| - for idx, c := range cp.live { | |
| + urls := make([]*url.URL, len(cp.mu.live)) | |
| + for idx, c := range cp.mu.live { | |
| urls[idx] = c.URL | |
| } | |
| @@ -239,13 +340,40 @@ func (cp *statusConnectionPool) URLs() []*url.URL { | |
| } | |
| func (cp *statusConnectionPool) connections() []*Connection { | |
| + cp.mu.RLock() | |
| + defer cp.mu.RUnlock() | |
| + | |
| var conns []*Connection | |
| - conns = append(conns, cp.live...) | |
| - conns = append(conns, cp.dead...) | |
| + conns = append(conns, cp.mu.live...) | |
| + conns = append(conns, cp.mu.dead...) | |
| return conns | |
| } | |
| +// RLock acquires a read lock on the connection pool. | |
| +// Implements rwLocker interface for efficient concurrent read access. | |
| +func (cp *statusConnectionPool) RLock() { | |
| + cp.mu.RLock() | |
| +} | |
| + | |
| +// RUnlock releases the read lock on the connection pool. | |
| +// Implements rwLocker interface for efficient concurrent read access. | |
| +func (cp *statusConnectionPool) RUnlock() { | |
| + cp.mu.RUnlock() | |
| +} | |
| + | |
| +// Lock acquires a write lock on the connection pool. | |
| +// Implements rwLocker interface (via embedded sync.Locker). | |
| +func (cp *statusConnectionPool) Lock() { | |
| + cp.mu.Lock() | |
| +} | |
| + | |
| +// Unlock releases the write lock on the connection pool. | |
| +// Implements rwLocker interface (via embedded sync.Locker). | |
| +func (cp *statusConnectionPool) Unlock() { | |
| + cp.mu.Unlock() | |
| +} | |
| + | |
| // resurrect adds the connection to the list of available connections. | |
| // When removeDead is true, it also removes it from the dead list. | |
| // The calling code is responsible for locking. | |
| @@ -254,13 +382,13 @@ func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) { | |
| debugLogger.Logf("Resurrecting %s\n", c.URL) | |
| } | |
| - c.markAsLive() | |
| - cp.live = append(cp.live, c) | |
| + c.markAsLiveWithLock() | |
| + cp.mu.live = append(cp.mu.live, c) | |
| if removeDead { | |
| index := -1 | |
| - for i, conn := range cp.dead { | |
| + for i, conn := range cp.mu.dead { | |
| if conn == c { | |
| index = i | |
| } | |
| @@ -268,36 +396,41 @@ func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) { | |
| if index >= 0 { | |
| // Remove item; https://github.com/golang/go/wiki/SliceTricks | |
| - copy(cp.dead[index:], cp.dead[index+1:]) | |
| - cp.dead = cp.dead[:len(cp.dead)-1] | |
| + copy(cp.mu.dead[index:], cp.mu.dead[index+1:]) | |
| + cp.mu.dead = cp.mu.dead[:len(cp.mu.dead)-1] | |
| } | |
| } | |
| } | |
| // scheduleResurrect schedules the connection to be resurrected. | |
| func (cp *statusConnectionPool) scheduleResurrect(c *Connection) { | |
| - factor := math.Min(float64(c.Failures-1), float64(cp.resurrectTimeoutFactorCutoff)) | |
| + failures := c.failures.Load() | |
| + factor := math.Min(float64(failures-1), float64(cp.resurrectTimeoutFactorCutoff)) | |
| timeout := time.Duration(cp.resurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second)) | |
| if debugLogger != nil { | |
| + c.mu.RLock() | |
| + deadSince := c.mu.deadSince | |
| + c.mu.RUnlock() | |
| + | |
| debugLogger.Logf( | |
| "Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", | |
| c.URL, | |
| - c.Failures, | |
| + failures, | |
| factor, | |
| timeout, | |
| - c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second), | |
| + deadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second), | |
| ) | |
| } | |
| time.AfterFunc(timeout, func() { | |
| - cp.Lock() | |
| - defer cp.Unlock() | |
| + cp.mu.Lock() | |
| + defer cp.mu.Unlock() | |
| - c.Lock() | |
| - defer c.Unlock() | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| - if !c.IsDead { | |
| + if !c.mu.isDead { | |
| if debugLogger != nil { | |
| debugLogger.Logf("Already resurrected %s\n", c.URL) | |
| } | |
| @@ -310,37 +443,277 @@ func (cp *statusConnectionPool) scheduleResurrect(c *Connection) { | |
| // Select returns the connection in a round-robin fashion. | |
| func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) { | |
| - s.Lock() | |
| - defer s.Unlock() | |
| + if len(conns) == 0 { | |
| + return nil, ErrNoConnections | |
| + } | |
| - s.curr = (s.curr + 1) % len(conns) | |
| - return conns[s.curr], nil | |
| + // Atomic increment with wrap-around | |
| + next := s.curr.Add(1) | |
| + index := int(next % int64(len(conns))) | |
| + return conns[index], nil | |
| } | |
| // markAsDead marks the connection as dead. | |
| func (c *Connection) markAsDead() { | |
| - c.IsDead = true | |
| - if c.DeadSince.IsZero() { | |
| - c.DeadSince = time.Now().UTC() | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| + c.markAsDeadWithLock() | |
| +} | |
| + | |
| +// markAsDeadWithLock marks the connection as dead (caller must hold lock). | |
| +func (c *Connection) markAsDeadWithLock() { | |
| + c.mu.isDead = true | |
| + if c.mu.deadSince.IsZero() { | |
| + c.mu.deadSince = time.Now().UTC() | |
| } | |
| - c.Failures++ | |
| + c.failures.Add(1) | |
| } | |
| // markAsLive marks the connection as alive. | |
| func (c *Connection) markAsLive() { | |
| - c.IsDead = false | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| + c.markAsLiveWithLock() | |
| +} | |
| + | |
| +// markAsLiveWithLock marks the connection as alive (caller must hold lock). | |
| +func (c *Connection) markAsLiveWithLock() { | |
| + c.mu.isDead = false | |
| } | |
| // markAsHealthy marks the connection as healthy. | |
| func (c *Connection) markAsHealthy() { | |
| - c.IsDead = false | |
| - c.DeadSince = time.Time{} | |
| - c.Failures = 0 | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| + c.markAsHealthyWithLock() | |
| +} | |
| + | |
| +// markAsHealthyWithLock marks the connection as healthy (caller must hold lock). | |
| +func (c *Connection) markAsHealthyWithLock() { | |
| + c.mu.isDead = false | |
| + c.mu.deadSince = time.Time{} | |
| + c.failures.Store(0) | |
| } | |
| // String returns a readable connection representation. | |
| func (c *Connection) String() string { | |
| - c.Lock() | |
| - defer c.Unlock() | |
| - return fmt.Sprintf("<%s> dead=%v failures=%d", c.URL, c.IsDead, c.Failures) | |
| + c.mu.RLock() | |
| + defer c.mu.RUnlock() | |
| + return fmt.Sprintf("<%s> dead=%v failures=%d", c.URL, c.mu.isDead, c.failures.Load()) | |
| +} | |
| + | |
| +// Role-based node selector implementations | |
| + | |
| +// RoleBasedSelector filters connections based on node roles and applies a fallback selector. | |
| +type RoleBasedSelector struct { | |
| + requiredRoles []string // Nodes must have at least one of these roles | |
| + excludedRoles []string // Nodes must not have any of these roles | |
| + fallback Selector // Fallback selector for load balancing among filtered nodes | |
| + allowFallback bool // If true, use any node if no role-matching nodes are available | |
| +} | |
| + | |
| +// RoleBasedSelectorOption configures a role-based selector. | |
| +type RoleBasedSelectorOption func(*RoleBasedSelector) | |
| + | |
| +// WithRequiredRoles specifies roles that nodes must have. | |
| +func WithRequiredRoles(roles ...string) RoleBasedSelectorOption { | |
| + return func(s *RoleBasedSelector) { | |
| + s.requiredRoles = append(s.requiredRoles, roles...) | |
| + } | |
| +} | |
| + | |
| +// WithExcludedRoles specifies roles that nodes must not have. | |
| +func WithExcludedRoles(roles ...string) RoleBasedSelectorOption { | |
| + return func(s *RoleBasedSelector) { | |
| + s.excludedRoles = append(s.excludedRoles, roles...) | |
| + } | |
| +} | |
| + | |
| +// WithStrictMode disables fallback when no matching nodes are found. | |
| +func WithStrictMode() RoleBasedSelectorOption { | |
| + return func(s *RoleBasedSelector) { | |
| + s.allowFallback = false | |
| + } | |
| +} | |
| + | |
| +// WithFallback sets the fallback selector used when no role-matching nodes are available. | |
| +func WithFallback(fallback Selector) RoleBasedSelectorOption { | |
| + return func(s *RoleBasedSelector) { | |
| + s.fallback = fallback | |
| + } | |
| +} | |
| + | |
| +// NewRoleBasedSelector creates a role-based connection selector with the specified options. | |
| +// If no fallback is provided via WithFallback, a round-robin selector is used by default. | |
| +func NewRoleBasedSelector(opts ...RoleBasedSelectorOption) *RoleBasedSelector { | |
| + s := &RoleBasedSelector{ | |
| + allowFallback: true, // Default to allowing fallback | |
| + fallback: NewRoundRobinSelector(), // Default fallback | |
| + } | |
| + | |
| + for _, opt := range opts { | |
| + opt(s) | |
| + } | |
| + | |
| + return s | |
| +} | |
| + | |
| +// Select filters connections based on role requirements and applies fallback selection. | |
| +func (s *RoleBasedSelector) Select(connections []*Connection) (*Connection, error) { | |
| + if len(connections) == 0 { | |
| + return nil, ErrNoConnections | |
| + } | |
| + | |
| + // Filter connections based on role requirements | |
| + filtered := s.filterByRoles(connections) | |
| + | |
| + // If we have role-matching nodes, use them | |
| + if len(filtered) > 0 { | |
| + return s.fallback.Select(filtered) | |
| + } | |
| + | |
| + // If no role-matching nodes and fallback is allowed, use any available node | |
| + if s.allowFallback { | |
| + return s.fallback.Select(connections) | |
| + } | |
| + | |
| + // Strict mode: no fallback allowed | |
| + return nil, fmt.Errorf("no connections found matching required roles: %v (available connections: %d)", | |
| + s.requiredRoles, len(connections)) | |
| +} | |
| + | |
| +// filterByRoles filters connections based on required and excluded roles. | |
| +func (s *RoleBasedSelector) filterByRoles(connections []*Connection) []*Connection { | |
| + filtered := make([]*Connection, 0, len(connections)) | |
| + | |
| + for _, conn := range connections { | |
| + // Check if connection has at least one required role | |
| + hasRequiredRole := len(s.requiredRoles) == 0 // If no required roles, all nodes qualify | |
| + if !hasRequiredRole { | |
| + for _, role := range conn.Roles { | |
| + if slices.Contains(s.requiredRoles, role) { | |
| + hasRequiredRole = true | |
| + break | |
| + } | |
| + } | |
| + } | |
| + | |
| + if !hasRequiredRole { | |
| + continue | |
| + } | |
| + | |
| + // Check if connection has any excluded roles | |
| + hasExcludedRole := false | |
| + for _, role := range conn.Roles { | |
| + if slices.Contains(s.excludedRoles, role) { | |
| + hasExcludedRole = true | |
| + break | |
| + } | |
| + } | |
| + | |
| + if hasExcludedRole { | |
| + continue | |
| + } | |
| + | |
| + filtered = append(filtered, conn) | |
| + } | |
| + | |
| + return filtered | |
| +} | |
| + | |
| +// SmartSelector examines request properties to route to optimal nodes. | |
| +type SmartSelector struct { | |
| + ingestSelector Selector // Used for ingest operations | |
| + searchSelector Selector // Used for search operations | |
| + defaultSelector Selector // Used for other operations | |
| +} | |
| + | |
| +// NewSmartSelector creates a request-aware selector that routes based on operation type. | |
| +func NewSmartSelector(defaultFallback Selector) *SmartSelector { | |
| + return &SmartSelector{ | |
| + ingestSelector: NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleIngest), | |
| + WithFallback(defaultFallback), | |
| + ), | |
| + searchSelector: NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleData), | |
| + WithFallback(defaultFallback), | |
| + ), | |
| + defaultSelector: defaultFallback, | |
| + } | |
| +} | |
| + | |
| +// Select implements the basic Selector interface using default routing. | |
| +func (s *SmartSelector) Select(connections []*Connection) (*Connection, error) { | |
| + return s.defaultSelector.Select(connections) | |
| +} | |
| + | |
| +// SelectForRequest implements RequestAwareSelector to route based on request properties. | |
| +func (s *SmartSelector) SelectForRequest(connections []*Connection, req Request) (*Connection, error) { | |
| + if len(connections) == 0 { | |
| + return nil, ErrNoConnections | |
| + } | |
| + | |
| + // Route based on request path and method | |
| + path := req.GetPath() | |
| + method := req.GetMethod() | |
| + | |
| + // Ingest operations - prefer ingest-capable nodes | |
| + if isIngestOperation(path, method) { | |
| + return s.ingestSelector.Select(connections) | |
| + } | |
| + | |
| + // Search operations - prefer data nodes | |
| + if isSearchOperation(path, method) { | |
| + return s.searchSelector.Select(connections) | |
| + } | |
| + | |
| + // Default routing for other operations | |
| + return s.defaultSelector.Select(connections) | |
| +} | |
| + | |
| +// isIngestOperation determines if a request is an ingest operation. | |
| +func isIngestOperation(path, method string) bool { | |
| + if method != "POST" && method != "PUT" { | |
| + return false | |
| + } | |
| + | |
| + // Ingest pipeline operations | |
| + if strings.Contains(path, "/_ingest/") { | |
| + return true | |
| + } | |
| + | |
| + // Bulk operations (often involve ingest pipelines) | |
| + if strings.HasSuffix(path, "/_bulk") { | |
| + return true | |
| + } | |
| + | |
| + // Document indexing with pipeline parameter would need header inspection | |
| + // This is a basic implementation - can be enhanced based on requirements | |
| + | |
| + return false | |
| +} | |
| + | |
| +// isSearchOperation determines if a request is a search operation. | |
| +func isSearchOperation(path, method string) bool { | |
| + if method != "GET" && method != "POST" { | |
| + return false | |
| + } | |
| + | |
| + // Search operations | |
| + if strings.HasSuffix(path, "/_search") || strings.HasPrefix(path, "/_search/") { | |
| + return true | |
| + } | |
| + | |
| + // Multi-search | |
| + if strings.HasSuffix(path, "/_msearch") { | |
| + return true | |
| + } | |
| + | |
| + // Document retrieval | |
| + if method == "GET" && !strings.HasPrefix(path, "/_") { | |
| + return true | |
| + } | |
| + | |
| + return false | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/connection_internal_test.go b/opensearchtransport/connection_internal_test.go | |
| index 46c9a31..ad99f2f 100644 | |
| --- a/opensearchtransport/connection_internal_test.go | |
| +++ b/opensearchtransport/connection_internal_test.go | |
| @@ -68,7 +68,7 @@ func TestSingleConnectionPoolOnFailure(t *testing.T) { | |
| func TestStatusConnectionPoolNext(t *testing.T) { | |
| t.Run("No URL", func(t *testing.T) { | |
| - pool := &statusConnectionPool{} | |
| + pool := newStatusConnectionPool(nil, nil, nil) | |
| c, err := pool.Next() | |
| if err == nil { | |
| @@ -79,13 +79,10 @@ func TestStatusConnectionPoolNext(t *testing.T) { | |
| t.Run("Two URLs", func(t *testing.T) { | |
| var c *Connection | |
| - pool := &statusConnectionPool{ | |
| - live: []*Connection{ | |
| - {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| - }, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| - } | |
| + pool := newStatusConnectionPool([]*Connection{ | |
| + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| + }, nil, nil) | |
| c, _ = pool.Next() | |
| @@ -105,14 +102,11 @@ func TestStatusConnectionPoolNext(t *testing.T) { | |
| }) | |
| t.Run("Three URLs", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - live: []*Connection{ | |
| - {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo3"}}, | |
| - }, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| - } | |
| + pool := newStatusConnectionPool([]*Connection{ | |
| + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| + {URL: &url.URL{Scheme: "http", Host: "foo3"}}, | |
| + }, nil, nil) | |
| var expected string | |
| for i := 0; i < 11; i++ { | |
| @@ -121,7 +115,7 @@ func TestStatusConnectionPoolNext(t *testing.T) { | |
| t.Errorf("Unexpected error: %s", err) | |
| } | |
| - switch i % len(pool.live) { | |
| + switch i % len(pool.mu.live) { | |
| case 0: | |
| expected = "http://foo1" | |
| case 1: | |
| @@ -139,13 +133,19 @@ func TestStatusConnectionPoolNext(t *testing.T) { | |
| }) | |
| t.Run("Resurrect dead connection when no live is available", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - live: []*Connection{}, | |
| - dead: []*Connection{ | |
| - {URL: &url.URL{Scheme: "http", Host: "foo1"}, Failures: 3}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo2"}, Failures: 1}, | |
| - }, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| + pool := newStatusConnectionPool(nil, nil, nil) | |
| + pool.mu.live = []*Connection{} | |
| + pool.mu.dead = []*Connection{ | |
| + func() *Connection { | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}} | |
| + conn.failures.Store(3) | |
| + return conn | |
| + }(), | |
| + func() *Connection { | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}} | |
| + conn.failures.Store(1) | |
| + return conn | |
| + }(), | |
| } | |
| c, err := pool.Next() | |
| @@ -161,88 +161,107 @@ func TestStatusConnectionPoolNext(t *testing.T) { | |
| t.Errorf("Expected <http://foo2>, got: %s", c.URL.String()) | |
| } | |
| - if c.IsDead { | |
| + c.mu.Lock() | |
| + isDead := c.mu.isDead | |
| + c.mu.Unlock() | |
| + | |
| + if isDead { | |
| t.Errorf("Expected connection to be live, got: %s", c) | |
| } | |
| - if len(pool.live) != 1 { | |
| - t.Errorf("Expected 1 connection in live list, got: %s", pool.live) | |
| + if len(pool.mu.live) != 1 { | |
| + t.Errorf("Expected 1 connection in live list, got: %s", pool.mu.live) | |
| } | |
| - if len(pool.dead) != 1 { | |
| - t.Errorf("Expected 1 connection in dead list, got: %s", pool.dead) | |
| + if len(pool.mu.dead) != 1 { | |
| + t.Errorf("Expected 1 connection in dead list, got: %s", pool.mu.dead) | |
| } | |
| }) | |
| } | |
| func TestStatusConnectionPoolOnSuccess(t *testing.T) { | |
| t.Run("Move connection to live list and mark it as healthy", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - dead: []*Connection{ | |
| - {URL: &url.URL{Scheme: "http", Host: "foo1"}, Failures: 3, IsDead: true}, | |
| - }, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| - } | |
| + deadConn := func() *Connection { | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}} | |
| + conn.failures.Store(3) | |
| + conn.mu.isDead = true | |
| + return conn | |
| + }() | |
| - conn := pool.dead[0] | |
| + pool := newStatusConnectionPool(nil, []*Connection{deadConn}, nil) | |
| + | |
| + conn := pool.mu.dead[0] | |
| pool.OnSuccess(conn) | |
| - if conn.IsDead { | |
| + conn.mu.Lock() | |
| + isDead := conn.mu.isDead | |
| + deadSince := conn.mu.deadSince | |
| + conn.mu.Unlock() | |
| + | |
| + if isDead { | |
| t.Errorf("Expected the connection to be live; %s", conn) | |
| } | |
| - if !conn.DeadSince.IsZero() { | |
| - t.Errorf("Unexpected value for DeadSince: %s", conn.DeadSince) | |
| + if !deadSince.IsZero() { | |
| + t.Errorf("Unexpected value for DeadSince: %s", deadSince) | |
| } | |
| - if len(pool.live) != 1 { | |
| - t.Errorf("Expected 1 live connection, got: %d", len(pool.live)) | |
| + if len(pool.mu.live) != 1 { | |
| + t.Errorf("Expected 1 live connection, got: %d", len(pool.mu.live)) | |
| } | |
| - if len(pool.dead) != 0 { | |
| - t.Errorf("Expected 0 dead connections, got: %d", len(pool.dead)) | |
| + if len(pool.mu.dead) != 0 { | |
| + t.Errorf("Expected 0 dead connections, got: %d", len(pool.mu.dead)) | |
| } | |
| }) | |
| } | |
| func TestStatusConnectionPoolOnFailure(t *testing.T) { | |
| t.Run("Remove connection, mark it, and sort dead connections", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - live: []*Connection{ | |
| - {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| - }, | |
| - dead: []*Connection{ | |
| - {URL: &url.URL{Scheme: "http", Host: "foo3"}, Failures: 0}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo4"}, Failures: 99}, | |
| - }, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| + deadConns := []*Connection{ | |
| + func() *Connection { | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo3"}} | |
| + conn.failures.Store(0) | |
| + return conn | |
| + }(), | |
| + func() *Connection { | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo4"}} | |
| + conn.failures.Store(99) | |
| + return conn | |
| + }(), | |
| + } | |
| + | |
| + liveConns := []*Connection{ | |
| + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| } | |
| - conn := pool.live[0] | |
| + pool := newStatusConnectionPool(liveConns, deadConns, nil) | |
| + | |
| + conn := pool.mu.live[0] | |
| if err := pool.OnFailure(conn); err != nil { | |
| t.Fatalf("Unexpected error: %s", err) | |
| } | |
| - conn.Lock() | |
| - if !conn.IsDead { | |
| + conn.mu.Lock() | |
| + if !conn.mu.isDead { | |
| t.Errorf("Expected the connection to be dead; %s", conn) | |
| } | |
| - if conn.DeadSince.IsZero() { | |
| - t.Errorf("Unexpected value for DeadSince: %s", conn.DeadSince) | |
| + if conn.mu.deadSince.IsZero() { | |
| + t.Errorf("Unexpected value for DeadSince: %s", conn.mu.deadSince) | |
| } | |
| - conn.Unlock() | |
| + conn.mu.Unlock() | |
| - pool.Lock() | |
| - defer pool.Unlock() | |
| - if len(pool.live) != 1 { | |
| - t.Errorf("Expected 1 live connection, got: %d", len(pool.live)) | |
| + pool.mu.Lock() | |
| + defer pool.mu.Unlock() | |
| + if len(pool.mu.live) != 1 { | |
| + t.Errorf("Expected 1 live connection, got: %d", len(pool.mu.live)) | |
| } | |
| - if len(pool.dead) != 3 { | |
| - t.Errorf("Expected 3 dead connections, got: %d", len(pool.dead)) | |
| + if len(pool.mu.dead) != 3 { | |
| + t.Errorf("Expected 3 dead connections, got: %d", len(pool.mu.dead)) | |
| } | |
| expected := []string{ | |
| @@ -252,109 +271,117 @@ func TestStatusConnectionPoolOnFailure(t *testing.T) { | |
| } | |
| for i, u := range expected { | |
| - if pool.dead[i].URL.String() != u { | |
| - t.Errorf("Unexpected value for item %d in pool.dead: %s", i, pool.dead[i].URL.String()) | |
| + if pool.mu.dead[i].URL.String() != u { | |
| + t.Errorf("Unexpected value for item %d in pool.mu.dead: %s", i, pool.mu.dead[i].URL.String()) | |
| } | |
| } | |
| }) | |
| t.Run("Short circuit when the connection is already dead", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - live: []*Connection{ | |
| - {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| - {URL: &url.URL{Scheme: "http", Host: "foo3"}}, | |
| - }, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| + liveConns := []*Connection{ | |
| + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, | |
| + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, | |
| + {URL: &url.URL{Scheme: "http", Host: "foo3"}}, | |
| } | |
| - conn := pool.live[0] | |
| - conn.IsDead = true | |
| + pool := newStatusConnectionPool(liveConns, nil, nil) | |
| + | |
| + conn := pool.mu.live[0] | |
| + conn.mu.Lock() | |
| + conn.mu.isDead = true | |
| + conn.mu.Unlock() | |
| if err := pool.OnFailure(conn); err != nil { | |
| t.Fatalf("Unexpected error: %s", err) | |
| } | |
| - if len(pool.dead) != 0 { | |
| - t.Errorf("Expected the dead list to be empty, got: %s", pool.dead) | |
| + if len(pool.mu.dead) != 0 { | |
| + t.Errorf("Expected the dead list to be empty, got: %s", pool.mu.dead) | |
| } | |
| }) | |
| } | |
| func TestStatusConnectionPoolResurrect(t *testing.T) { | |
| t.Run("Mark the connection as dead and add/remove it to the lists", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - live: []*Connection{}, | |
| - dead: []*Connection{{URL: &url.URL{Scheme: "http", Host: "foo1"}, IsDead: true}}, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| - } | |
| - | |
| - conn := pool.dead[0] | |
| - conn.Lock() | |
| - defer conn.Unlock() | |
| + pool := newStatusConnectionPool([]*Connection{}, []*Connection{ | |
| + func() *Connection { | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}} | |
| + conn.mu.isDead = true | |
| + return conn | |
| + }(), | |
| + }, NewRoundRobinSelector()) | |
| + | |
| + conn := pool.mu.dead[0] | |
| pool.resurrect(conn, true) | |
| - if conn.IsDead { | |
| - t.Errorf("Expected connection to be dead, got: %s", conn) | |
| + conn.mu.Lock() | |
| + isDead := conn.mu.isDead | |
| + conn.mu.Unlock() | |
| + | |
| + if isDead { | |
| + t.Errorf("Expected connection to be live, got: %s", conn) | |
| } | |
| - if len(pool.dead) != 0 { | |
| - t.Errorf("Expected no dead connections, got: %s", pool.dead) | |
| + if len(pool.mu.dead) != 0 { | |
| + t.Errorf("Expected no dead connections, got: %s", pool.mu.dead) | |
| } | |
| - if len(pool.live) != 1 { | |
| - t.Errorf("Expected 1 live connection, got: %s", pool.live) | |
| + if len(pool.mu.live) != 1 { | |
| + t.Errorf("Expected 1 live connection, got: %s", pool.mu.live) | |
| } | |
| }) | |
| t.Run("Short circuit removal when the connection is not in the dead list", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - dead: []*Connection{{URL: &url.URL{Scheme: "http", Host: "bar"}, IsDead: true}}, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| - } | |
| - | |
| - conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}, IsDead: true} | |
| - conn.Lock() | |
| - defer conn.Unlock() | |
| + pool := newStatusConnectionPool([]*Connection{}, []*Connection{ | |
| + func() *Connection { | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "bar"}} | |
| + conn.mu.isDead = true | |
| + return conn | |
| + }(), | |
| + }, NewRoundRobinSelector()) | |
| + | |
| + conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}} | |
| + conn.mu.Lock() | |
| + conn.mu.isDead = true | |
| + conn.mu.Unlock() | |
| pool.resurrect(conn, true) | |
| - if len(pool.live) != 1 { | |
| - t.Errorf("Expected 1 live connection, got: %s", pool.live) | |
| + if len(pool.mu.live) != 1 { | |
| + t.Errorf("Expected 1 live connection, got: %s", pool.mu.live) | |
| } | |
| - if len(pool.dead) != 1 { | |
| - t.Errorf("Expected 1 dead connection, got: %s", pool.dead) | |
| + if len(pool.mu.dead) != 1 { | |
| + t.Errorf("Expected 1 dead connection, got: %s", pool.mu.dead) | |
| } | |
| }) | |
| t.Run("Schedule resurrect", func(t *testing.T) { | |
| - pool := &statusConnectionPool{ | |
| - live: []*Connection{}, | |
| - dead: []*Connection{ | |
| - { | |
| - URL: &url.URL{Scheme: "http", Host: "foo1"}, | |
| - Failures: 100, | |
| - IsDead: true, | |
| - DeadSince: time.Now().UTC(), | |
| - }, | |
| - }, | |
| - selector: &roundRobinSelector{curr: -1}, | |
| - resurrectTimeoutInitial: 0, | |
| - resurrectTimeoutFactorCutoff: defaultResurrectTimeoutFactorCutoff, | |
| - } | |
| - | |
| - conn := pool.dead[0] | |
| + pool := newStatusConnectionPool([]*Connection{}, []*Connection{ | |
| + func() *Connection { | |
| + conn := &Connection{ | |
| + URL: &url.URL{Scheme: "http", Host: "foo1"}, | |
| + } | |
| + conn.failures.Store(100) | |
| + conn.mu.isDead = true | |
| + conn.mu.deadSince = time.Now().UTC() | |
| + return conn | |
| + }(), | |
| + }, NewRoundRobinSelector()) | |
| + // Override the timeout for this specific test | |
| + pool.resurrectTimeoutInitial = 0 | |
| + | |
| + conn := pool.mu.dead[0] | |
| pool.scheduleResurrect(conn) | |
| time.Sleep(50 * time.Millisecond) | |
| - pool.Lock() | |
| - defer pool.Unlock() | |
| + pool.mu.Lock() | |
| + defer pool.mu.Unlock() | |
| - if len(pool.live) != 1 { | |
| - t.Errorf("Expected 1 live connection, got: %s", pool.live) | |
| + if len(pool.mu.live) != 1 { | |
| + t.Errorf("Expected 1 live connection, got: %s", pool.mu.live) | |
| } | |
| - if len(pool.dead) != 0 { | |
| - t.Errorf("Expected no dead connections, got: %s", pool.dead) | |
| + if len(pool.mu.dead) != 0 { | |
| + t.Errorf("Expected no dead connections, got: %s", pool.mu.dead) | |
| } | |
| }) | |
| } | |
| @@ -362,11 +389,11 @@ func TestStatusConnectionPoolResurrect(t *testing.T) { | |
| func TestConnection(t *testing.T) { | |
| t.Run("String", func(t *testing.T) { | |
| conn := &Connection{ | |
| - URL: &url.URL{Scheme: "http", Host: "foo1"}, | |
| - Failures: 10, | |
| - IsDead: true, | |
| - DeadSince: time.Now().UTC(), | |
| + URL: &url.URL{Scheme: "http", Host: "foo1"}, | |
| } | |
| + conn.failures.Store(10) | |
| + conn.mu.isDead = true | |
| + conn.mu.deadSince = time.Now().UTC() | |
| match, err := regexp.MatchString( | |
| `<http://foo1> dead=true failures=10`, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/discovery.go b/opensearchtransport/discovery.go | |
| index b180b11..5d0f45c 100644 | |
| --- a/opensearchtransport/discovery.go | |
| +++ b/opensearchtransport/discovery.go | |
| @@ -31,14 +31,120 @@ import ( | |
| "encoding/json" | |
| "fmt" | |
| "io" | |
| + "math/rand" | |
| "net" | |
| "net/http" | |
| "net/url" | |
| + "slices" | |
| "strings" | |
| - "sync" | |
| "time" | |
| ) | |
| +// Node role constants to match upstream OpenSearch server definitions | |
| +const ( | |
| + RoleData = "data" | |
| + RoleIngest = "ingest" | |
| + RoleClusterManager = "cluster_manager" | |
| + RoleRemoteClusterClient = "remote_cluster_client" | |
| + RoleSearch = "search" | |
| + RoleWarm = "warm" | |
| + | |
| + // RoleMaster is Deprecated: Use RoleClusterManager instead | |
| + RoleMaster = "master" | |
| + | |
| + // defaultDiscoveryTimeout is the default timeout for node discovery operations. | |
| + // This can be overridden via Config.DiscoveryTimeout. | |
| + defaultDiscoveryTimeout = 30 * time.Second | |
| +) | |
| + | |
| +// addJitter adds ±10% jitter to a duration to prevent thundering herd | |
| +func addJitter(d time.Duration) time.Duration { | |
| + if d <= 0 { | |
| + return d | |
| + } | |
| + // Calculate 10% of the duration | |
| + jitter := float64(d) * 0.1 | |
| + // Add random value between -jitter and +jitter | |
| + randomJitter := (rand.Float64()*2 - 1) * jitter | |
| + return d + time.Duration(randomJitter) | |
| +} | |
| + | |
| +// roleSet represents a set of node roles for efficient O(1) role lookups. | |
| +type roleSet map[string]struct{} | |
| + | |
| +// newRoleSet creates a roleSet from a slice of role names. | |
| +func newRoleSet(roles []string) roleSet { | |
| + rs := make(roleSet, len(roles)) | |
| + for _, role := range roles { | |
| + rs[role] = struct{}{} | |
| + if role == RoleMaster { | |
| + // Alias deprecated "master" role to "cluster_manager" for internal checks, | |
| + // so we only need to perform a single check for "cluster_manager" elsewhere in the library. | |
| + rs[RoleClusterManager] = struct{}{} | |
| + } | |
| + } | |
| + return rs | |
| +} | |
| + | |
| +// has checks if the roleSet contains a specific role using O(1) map lookup. | |
| +func (rs roleSet) has(roleName string) bool { | |
| + _, exists := rs[roleName] | |
| + return exists | |
| +} | |
| + | |
| +// validate checks for role compatibility issues and logs warnings. | |
| +// This implements validation logic similar to DiscoveryNodeRole.validateRole() | |
| +func (rs roleSet) validate(nodeName string) error { | |
| + hasSearch := rs.has(RoleSearch) | |
| + hasWarm := rs.has(RoleWarm) | |
| + | |
| + // Validate warm role compatibility (warm nodes can coexist with data roles) | |
| + if hasWarm && hasSearch { | |
| + return fmt.Errorf("node [%s] cannot have both %q and %q roles - use %q for searchable snapshots in OpenSearch 3.0+", | |
| + nodeName, RoleWarm, RoleSearch, RoleWarm) | |
| + } | |
| + | |
| + // Log deprecation warning for master role usage (check roleSet directly) | |
| + if rs.has(RoleMaster) { | |
| + if debugLogger != nil { | |
| + debugLogger.Logf("DEPRECATION WARNING: Node [%s] uses deprecated %q role. Please use %q role instead to promote inclusive language\n", | |
| + nodeName, RoleMaster, RoleClusterManager) | |
| + } | |
| + } | |
| + | |
| + // Log deprecation warning for search role usage in OpenSearch 3.0+ | |
| + if hasSearch { | |
| + if debugLogger != nil { | |
| + debugLogger.Logf("DEPRECATION WARNING: Node [%s] uses %q role. As of OpenSearch 3.0, searchable snapshots functionality requires %q role instead. Consider migrating to %q role for future compatibility\n", | |
| + nodeName, RoleSearch, RoleWarm, RoleWarm) | |
| + } | |
| + } | |
| + | |
| + return nil | |
| +} | |
| + | |
| +// isDedicatedClusterManager implements the logic from upstream Java client | |
| +// NodeSelector.SKIP_DEDICATED_CLUSTER_MASTERS to determine if a node should be skipped. | |
| +// It returns true for nodes that are cluster-manager eligible but have no "work" roles | |
| +// (i.e., roles that actually process/store data or handle requests). | |
| +// This matches OpenSearch server's SniffConnectionStrategy.DEFAULT_NODE_PREDICATE behavior. | |
| +func (rs roleSet) isDedicatedClusterManager() bool { | |
| + // Must be cluster manager eligible first | |
| + if !rs.has(RoleClusterManager) { | |
| + return false | |
| + } | |
| + | |
| + // Check if it has any "work" roles that make it non-dedicated | |
| + workRoles := []string{ | |
| + RoleData, // stores and retrieves data | |
| + RoleIngest, // processes incoming data | |
| + RoleWarm, // handles warm/cold data storage | |
| + RoleSearch, // dedicated search processing (though deprecated in 3.0+) | |
| + } | |
| + | |
| + return !slices.ContainsFunc(workRoles, rs.has) | |
| +} | |
| + | |
| // Discoverable defines the interface for transports supporting node discovery. | |
| type Discoverable interface { | |
| DiscoverNodes() error | |
| @@ -46,10 +152,11 @@ type Discoverable interface { | |
| // nodeInfo represents the information about node in a cluster. | |
| type nodeInfo struct { | |
| - ID string `json:"id"` | |
| - Name string `json:"name"` | |
| - URL *url.URL `json:"url"` | |
| - Roles []string `json:"roles"` | |
| + ID string `json:"id"` | |
| + Name string `json:"name"` | |
| + URL *url.URL `json:"url"` | |
| + Roles []string `json:"roles"` | |
| + roleSet roleSet | |
| Attributes map[string]interface{} `json:"attributes"` | |
| HTTP struct { | |
| PublishAddress string `json:"publish_address"` | |
| @@ -58,9 +165,14 @@ type nodeInfo struct { | |
| // DiscoverNodes reloads the client connections by fetching information from the cluster. | |
| func (c *Client) DiscoverNodes() error { | |
| + return c.DiscoverNodesWithContext(context.Background()) | |
| +} | |
| + | |
| +// DiscoverNodesWithContext reloads the client connections by fetching information from the cluster. | |
| +func (c *Client) DiscoverNodesWithContext(ctx context.Context) error { | |
| conns := make([]*Connection, 0) | |
| - nodes, err := c.getNodesInfo() | |
| + nodes, err := c.getNodesInfo(ctx) | |
| if err != nil { | |
| if debugLogger != nil { | |
| debugLogger.Logf("Error getting nodes info: %s\n", err) | |
| @@ -70,24 +182,31 @@ func (c *Client) DiscoverNodes() error { | |
| } | |
| for _, node := range nodes { | |
| - var isClusterManagerOnlyNode bool | |
| + // Build role set for efficient O(1) lookups | |
| + node.roleSet = newRoleSet(node.Roles) | |
| - if len(node.Roles) == 1 && (node.Roles[0] == "master" || node.Roles[0] == "cluster_manager") { | |
| - isClusterManagerOnlyNode = true | |
| + // Validate node roles for compatibility and anti-patterns | |
| + if err := node.roleSet.validate(node.Name); err != nil { | |
| + if debugLogger != nil { | |
| + debugLogger.Logf("Role validation warning for node %q: %s\n", node.Name, err) | |
| + } | |
| + // Continue processing but log the issue - don't fail discovery entirely | |
| } | |
| + // Skip this node if the user wants to exclude cluster managers (default) and this node is a dedicated cluster master. | |
| + shouldSkip := !c.includeDedicatedClusterManagers && node.roleSet.isDedicatedClusterManager() | |
| + | |
| if debugLogger != nil { | |
| var skip string | |
| - if isClusterManagerOnlyNode { | |
| - skip = "; [SKIP]" | |
| + if shouldSkip { | |
| + skip = "; [SKIP: dedicated cluster manager]" | |
| } | |
| - debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip) | |
| + debugLogger.Logf("Discovered node %q; %s; roles=%v%s\n", node.Name, node.URL, node.Roles, skip) | |
| } | |
| - // Skip cluster_manager only nodes | |
| - // TODO: Move logic to Selector? | |
| - if isClusterManagerOnlyNode { | |
| + // Skip dedicated cluster managers (matching upstream Java client behavior) | |
| + if shouldSkip { | |
| continue | |
| } | |
| @@ -100,35 +219,110 @@ func (c *Client) DiscoverNodes() error { | |
| }) | |
| } | |
| - c.Lock() | |
| - defer c.Unlock() | |
| - | |
| - if lockable, ok := c.pool.(sync.Locker); ok { | |
| - lockable.Lock() | |
| - defer lockable.Unlock() | |
| - } | |
| + c.mu.Lock() | |
| + defer c.mu.Unlock() | |
| if c.poolFunc != nil { | |
| - c.pool = c.poolFunc(conns, c.selector) | |
| + c.mu.pool = c.poolFunc(conns, c.selector) | |
| } else { | |
| - // TODO: Replace only live connections, leave dead scheduled for resurrect? | |
| - c.pool = NewConnectionPool(conns, c.selector) | |
| + c.mu.pool = c.updateConnPool(conns, c.selector) | |
| } | |
| return nil | |
| } | |
| -func (c *Client) getNodesInfo() ([]nodeInfo, error) { | |
| +// updateConnPool updates the connection pool, preserving existing connections | |
| +// to nodes still in the cluster and adding new ones. | |
| +func (c *Client) updateConnPool(discoveredConns []*Connection, selector Selector) ConnectionPool { | |
| + // Get existing connections from current pool if it's a statusConnectionPool | |
| + var existingConns []*Connection | |
| + if statusPool, ok := c.mu.pool.(*statusConnectionPool); ok { | |
| + existingConns = statusPool.connections() | |
| + } | |
| + | |
| + // If no existing connections, create new statusConnectionPool | |
| + if len(existingConns) == 0 { | |
| + if selector == nil { | |
| + selector = NewRoundRobinSelector() | |
| + } | |
| + return newStatusConnectionPool(discoveredConns, nil, selector) | |
| + } | |
| + | |
| + // Create map of discovered node URLs for fast lookup | |
| + discoveredURLs := make(map[string]*Connection, len(discoveredConns)) | |
| + for _, conn := range discoveredConns { | |
| + discoveredURLs[conn.URL.String()] = conn | |
| + } | |
| + | |
| + var preservedConns []*Connection | |
| + var newConns []*Connection | |
| + | |
| + // Preserve existing connections that map to discovered nodes | |
| + for _, existing := range existingConns { | |
| + if _, exists := discoveredURLs[existing.URL.String()]; exists { | |
| + preservedConns = append(preservedConns, existing) | |
| + delete(discoveredURLs, existing.URL.String()) // Remove to track new nodes | |
| + } | |
| + } | |
| + | |
| + // Add new connections for nodes not in existing pool | |
| + for _, conn := range discoveredURLs { | |
| + newConns = append(newConns, conn) | |
| + } | |
| + | |
| + // Pre-allocate slices - worst case all preserved could be live or dead, plus all new are live | |
| + totalConns := len(preservedConns) + len(newConns) | |
| + liveConns := make([]*Connection, 0, totalConns) | |
| + deadConns := make([]*Connection, 0, len(preservedConns)) // Only preserved can be dead | |
| + | |
| + // Separate preserved connections into live and dead | |
| + for _, conn := range preservedConns { | |
| + conn.mu.RLock() | |
| + isDead := conn.mu.isDead | |
| + conn.mu.RUnlock() | |
| + | |
| + if isDead { | |
| + deadConns = append(deadConns, conn) | |
| + } else { | |
| + liveConns = append(liveConns, conn) | |
| + } | |
| + } | |
| + | |
| + // All new connections start as live | |
| + liveConns = append(liveConns, newConns...) | |
| + | |
| + // Always create statusConnectionPool to maintain connection state management | |
| + if selector == nil { | |
| + selector = NewRoundRobinSelector() | |
| + } | |
| + | |
| + return newStatusConnectionPool(liveConns, deadConns, selector) | |
| +} | |
| + | |
| +func (c *Client) getNodesInfo(ctx context.Context) ([]nodeInfo, error) { | |
| scheme := c.urls[0].Scheme | |
| - req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, "/_nodes/http", nil) | |
| + // Set up discovery context with configurable timeout | |
| + var discoveryCtx context.Context | |
| + var cancel context.CancelFunc | |
| + | |
| + if timeout, useTimeout := c.resolveDiscoveryTimeout(); useTimeout { | |
| + // Apply the configured or default timeout | |
| + discoveryCtx, cancel = context.WithTimeout(ctx, timeout) | |
| + defer cancel() | |
| + } else { | |
| + // No timeout - use the provided context as-is | |
| + discoveryCtx = ctx | |
| + } | |
| + | |
| + req, err := http.NewRequestWithContext(discoveryCtx, http.MethodGet, "/_nodes/http", nil) | |
| if err != nil { | |
| return nil, err | |
| } | |
| - c.Lock() | |
| - conn, err := c.pool.Next() | |
| - c.Unlock() | |
| + c.mu.RLock() | |
| + conn, err := c.mu.pool.Next() | |
| + c.mu.RUnlock() | |
| // TODO: If no connection is returned, fallback to original URLs | |
| if err != nil { | |
| return nil, err | |
| @@ -177,6 +371,11 @@ func (c *Client) getNodesInfo() ([]nodeInfo, error) { | |
| idx++ | |
| } | |
| + // Validate that we discovered at least one node | |
| + if len(out) == 0 { | |
| + return nil, fmt.Errorf("discovery returned no nodes - cluster may be unreachable or misconfigured") | |
| + } | |
| + | |
| return out, nil | |
| } | |
| @@ -207,18 +406,44 @@ func (c *Client) getNodeURL(node nodeInfo, scheme string) *url.URL { | |
| return u | |
| } | |
| -func (c *Client) scheduleDiscoverNodes() { | |
| - //nolint:errcheck // errors are logged inside the function | |
| - go c.DiscoverNodes() | |
| - | |
| - c.Lock() | |
| - defer c.Unlock() | |
| - | |
| - if c.discoverNodesTimer != nil { | |
| - c.discoverNodesTimer.Stop() | |
| +func (c *Client) scheduleDiscoverNodes(ctx context.Context) { | |
| + // If interval is not configured, just run discovery once | |
| + if c.discoverNodesInterval <= 0 { | |
| + //nolint:errcheck // errors are logged inside the function | |
| + go c.DiscoverNodesWithContext(ctx) | |
| + return | |
| } | |
| - c.discoverNodesTimer = time.AfterFunc(c.discoverNodesInterval, func() { | |
| - c.scheduleDiscoverNodes() | |
| - }) | |
| + go func() { | |
| + // Initialize discovery context and ticker with initial jitter | |
| + discoveryCtx, discoveryCancel := context.WithCancel(ctx) | |
| + jitteredInterval := addJitter(c.discoverNodesInterval) | |
| + ticker := time.NewTicker(jitteredInterval) | |
| + | |
| + // Ensure we clean up when we exit | |
| + defer func() { | |
| + discoveryCancel() | |
| + ticker.Stop() | |
| + }() | |
| + | |
| + // Run discovery immediately, then on each tick with fresh jitter | |
| + for { | |
| + // Start discovery (non-blocking) | |
| + //nolint:errcheck // errors are logged inside the function | |
| + go c.DiscoverNodesWithContext(discoveryCtx) | |
| + | |
| + select { | |
| + case <-ctx.Done(): | |
| + return | |
| + case <-ticker.C: | |
| + // Cancel previous discovery and reset for next iteration | |
| + discoveryCancel() | |
| + discoveryCtx, discoveryCancel = context.WithCancel(ctx) | |
| + | |
| + // Reset ticker with fresh jitter to avoid synchronization across clients | |
| + jitteredInterval = addJitter(c.discoverNodesInterval) | |
| + ticker.Reset(jitteredInterval) | |
| + } | |
| + } | |
| + }() | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/discovery_internal_test.go b/opensearchtransport/discovery_internal_test.go | |
| index 79373c3..37a4a34 100644 | |
| --- a/opensearchtransport/discovery_internal_test.go | |
| +++ b/opensearchtransport/discovery_internal_test.go | |
| @@ -32,12 +32,15 @@ import ( | |
| "bytes" | |
| "crypto/tls" | |
| "encoding/json" | |
| + "errors" | |
| "fmt" | |
| "io" | |
| "net/http" | |
| "net/url" | |
| "os" | |
| "reflect" | |
| + "slices" | |
| + "strings" | |
| "testing" | |
| "time" | |
| @@ -79,7 +82,7 @@ func TestDiscovery(t *testing.T) { | |
| u, _ := url.Parse("http://" + srv.Addr) | |
| tp, _ := New(Config{URLs: []*url.URL{u}}) | |
| - nodes, err := tp.getNodesInfo() | |
| + nodes, err := tp.getNodesInfo(t.Context()) | |
| if err != nil { | |
| t.Fatalf("ERROR: %s", err) | |
| } | |
| @@ -123,27 +126,52 @@ func TestDiscovery(t *testing.T) { | |
| tp, err := New(Config{URLs: []*url.URL{u}, Transport: newRoundTripper()}) | |
| require.NoError(t, err) | |
| - _, err = tp.getNodesInfo() | |
| + _, err = tp.getNodesInfo(t.Context()) | |
| assert.Error(t, err) | |
| assert.Contains(t, err.Error(), "unexpected empty body") | |
| }) | |
| + t.Run("getNodesInfo() empty nodes", func(t *testing.T) { | |
| + newRoundTripper := func() http.RoundTripper { | |
| + return &mockTransp{ | |
| + RoundTripFunc: func(req *http.Request) (*http.Response, error) { | |
| + // Return valid JSON response with empty nodes object | |
| + body := `{"nodes": {}}` | |
| + return &http.Response{ | |
| + StatusCode: 200, | |
| + Header: http.Header{"Content-Type": []string{"application/json"}}, | |
| + Body: io.NopCloser(strings.NewReader(body)), | |
| + }, nil | |
| + }, | |
| + } | |
| + } | |
| + | |
| + u, _ := url.Parse("http://localhost:8080") | |
| + tp, err := New(Config{URLs: []*url.URL{u}, Transport: newRoundTripper()}) | |
| + require.NoError(t, err) | |
| + | |
| + _, err = tp.getNodesInfo(t.Context()) | |
| + assert.Error(t, err) | |
| + assert.Contains(t, err.Error(), "discovery returned no nodes") | |
| + assert.Contains(t, err.Error(), "cluster may be unreachable or misconfigured") | |
| + }) | |
| + | |
| t.Run("DiscoverNodes()", func(t *testing.T) { | |
| u, _ := url.Parse("http://" + srv.Addr) | |
| tp, _ := New(Config{URLs: []*url.URL{u}}) | |
| tp.DiscoverNodes() | |
| - pool, ok := tp.pool.(*statusConnectionPool) | |
| + pool, ok := tp.mu.pool.(*statusConnectionPool) | |
| if !ok { | |
| - t.Fatalf("Unexpected pool, want=statusConnectionPool, got=%T", tp.pool) | |
| + t.Fatalf("Unexpected pool, want=statusConnectionPool, got=%T", tp.mu.pool) | |
| } | |
| - if len(pool.live) != 2 { | |
| - t.Errorf("Unexpected number of nodes, want=2, got=%d", len(pool.live)) | |
| + if len(pool.mu.live) != 2 { | |
| + t.Errorf("Unexpected number of nodes, want=2, got=%d", len(pool.mu.live)) | |
| } | |
| - for _, conn := range pool.live { | |
| + for _, conn := range pool.mu.live { | |
| switch conn.Name { | |
| case "es1": | |
| if conn.URL.String() != "http://127.0.0.1:10001" { | |
| @@ -174,16 +202,16 @@ func TestDiscovery(t *testing.T) { | |
| tp.DiscoverNodes() | |
| - pool, ok := tp.pool.(*statusConnectionPool) | |
| + pool, ok := tp.mu.pool.(*statusConnectionPool) | |
| if !ok { | |
| - t.Fatalf("Unexpected pool, want=statusConnectionPool, got=%T", tp.pool) | |
| + t.Fatalf("Unexpected pool, want=statusConnectionPool, got=%T", tp.mu.pool) | |
| } | |
| - if len(pool.live) != 2 { | |
| - t.Errorf("Unexpected number of nodes, want=2, got=%d", len(pool.live)) | |
| + if len(pool.mu.live) != 2 { | |
| + t.Errorf("Unexpected number of nodes, want=2, got=%d", len(pool.mu.live)) | |
| } | |
| - for _, conn := range pool.live { | |
| + for _, conn := range pool.mu.live { | |
| switch conn.Name { | |
| case "es1": | |
| if conn.URL.String() != "https://127.0.0.1:10001" { | |
| @@ -207,17 +235,17 @@ func TestDiscovery(t *testing.T) { | |
| tp, _ := New(Config{URLs: []*url.URL{u}, DiscoverNodesInterval: 10 * time.Millisecond}) | |
| - tp.Lock() | |
| - numURLs = len(tp.pool.URLs()) | |
| - tp.Unlock() | |
| + tp.mu.Lock() | |
| + numURLs = len(tp.mu.pool.URLs()) | |
| + tp.mu.Unlock() | |
| if numURLs != 1 { | |
| t.Errorf("Unexpected number of nodes, want=1, got=%d", numURLs) | |
| } | |
| time.Sleep(18 * time.Millisecond) // Wait until (*Client).scheduleDiscoverNodes() | |
| - tp.Lock() | |
| - numURLs = len(tp.pool.URLs()) | |
| - tp.Unlock() | |
| + tp.mu.Lock() | |
| + numURLs = len(tp.mu.pool.URLs()) | |
| + tp.mu.Unlock() | |
| if numURLs != 2 { | |
| t.Errorf("Unexpected number of nodes, want=2, got=%d", numURLs) | |
| } | |
| @@ -520,7 +548,19 @@ func TestDiscovery(t *testing.T) { | |
| nodes := make(map[string]map[string]nodeInfo) | |
| nodes["nodes"] = make(map[string]nodeInfo) | |
| for name, node := range tt.args.Nodes { | |
| - nodes["nodes"][name] = nodeInfo{Roles: node.Roles} | |
| + // Parse URL to extract host and port for PublishAddress | |
| + if u, err := url.Parse(node.URL); err == nil { | |
| + nodes["nodes"][name] = nodeInfo{ | |
| + Roles: node.Roles, | |
| + HTTP: struct { | |
| + PublishAddress string `json:"publish_address"` | |
| + }{ | |
| + PublishAddress: u.Host, // e.g., "es1:9200" | |
| + }, | |
| + } | |
| + } else { | |
| + nodes["nodes"][name] = nodeInfo{Roles: node.Roles} | |
| + } | |
| } | |
| b, _ := json.Marshal(nodes) | |
| @@ -542,16 +582,16 @@ func TestDiscovery(t *testing.T) { | |
| }) | |
| c.DiscoverNodes() | |
| - pool, ok := c.pool.(*statusConnectionPool) | |
| + pool, ok := c.mu.pool.(*statusConnectionPool) | |
| if !ok { | |
| - t.Fatalf("Unexpected pool, want=statusConnectionPool, got=%T", c.pool) | |
| + t.Fatalf("Unexpected pool, want=statusConnectionPool, got=%T", c.mu.pool) | |
| } | |
| - if len(pool.live) != tt.want.wantsNConn { | |
| - t.Errorf("Unexpected number of nodes, want=%d, got=%d", tt.want.wantsNConn, len(pool.live)) | |
| + if len(pool.mu.live) != tt.want.wantsNConn { | |
| + t.Errorf("Unexpected number of nodes, want=%d, got=%d", tt.want.wantsNConn, len(pool.mu.live)) | |
| } | |
| - for _, conn := range pool.live { | |
| + for _, conn := range pool.mu.live { | |
| if !reflect.DeepEqual(tt.args.Nodes[conn.ID].Roles, conn.Roles) { | |
| t.Errorf("Unexpected roles for node %s, want=%s, got=%s", conn.Name, tt.args.Nodes[conn.ID], conn.Roles) | |
| } | |
| @@ -564,3 +604,1228 @@ func TestDiscovery(t *testing.T) { | |
| } | |
| }) | |
| } | |
| + | |
| +// TestRoleConstants verifies that role constants match expected values | |
| +func TestRoleConstants(t *testing.T) { | |
| + assert.Equal(t, "data", RoleData) | |
| + assert.Equal(t, "ingest", RoleIngest) | |
| + assert.Equal(t, "master", RoleMaster) | |
| + assert.Equal(t, "cluster_manager", RoleClusterManager) | |
| + assert.Equal(t, "remote_cluster_client", RoleRemoteClusterClient) | |
| + assert.Equal(t, "search", RoleSearch) | |
| + assert.Equal(t, "warm", RoleWarm) | |
| +} | |
| + | |
| +// TestNewRoleSet verifies efficient role set creation | |
| +func TestNewRoleSet(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + roles []string | |
| + want roleSet | |
| + }{ | |
| + { | |
| + "empty roles", | |
| + []string{}, | |
| + roleSet{}, | |
| + }, | |
| + { | |
| + "single role", | |
| + []string{RoleData}, | |
| + roleSet{RoleData: {}}, | |
| + }, | |
| + { | |
| + "multiple roles", | |
| + []string{RoleData, RoleIngest, RoleClusterManager}, | |
| + roleSet{ | |
| + RoleData: {}, | |
| + RoleIngest: {}, | |
| + RoleClusterManager: {}, | |
| + }, | |
| + }, | |
| + { | |
| + "depcrecated master role is aliased to cluster_manager", | |
| + []string{RoleMaster}, | |
| + roleSet{ | |
| + RoleMaster: {}, | |
| + RoleClusterManager: {}, | |
| + }, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + got := newRoleSet(tt.roles) | |
| + assert.Equal(t, tt.want, got) | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// TestRoleSetHas verifies O(1) role lookups | |
| +func TestRoleSetHas(t *testing.T) { | |
| + rs := newRoleSet([]string{RoleData, RoleClusterManager, RoleIngest}) | |
| + | |
| + assert.True(t, rs.has(RoleData)) | |
| + assert.True(t, rs.has(RoleClusterManager)) | |
| + assert.True(t, rs.has(RoleIngest)) | |
| + assert.False(t, rs.has(RoleMaster)) | |
| + assert.False(t, rs.has(RoleSearch)) | |
| + assert.False(t, rs.has("nonexistent")) | |
| +} | |
| + | |
| +// TestRoleCheckFunctions verifies role-specific check functions | |
| +func TestRoleCheckFunctions(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + roles []string | |
| + expectClusterManager bool | |
| + expectData bool | |
| + expectIngest bool | |
| + expectSearch bool | |
| + expectWarm bool | |
| + }{ | |
| + { | |
| + "cluster manager eligible with cluster_manager role", | |
| + []string{RoleClusterManager}, | |
| + true, false, false, false, false, | |
| + }, | |
| + { | |
| + "cluster manager eligible with deprecated master role", | |
| + []string{RoleMaster}, | |
| + true, false, false, false, false, | |
| + }, | |
| + { | |
| + "data node", | |
| + []string{RoleData}, | |
| + false, true, false, false, false, | |
| + }, | |
| + { | |
| + "ingest node", | |
| + []string{RoleIngest}, | |
| + false, false, true, false, false, | |
| + }, | |
| + { | |
| + "search node", | |
| + []string{RoleSearch}, | |
| + false, false, false, true, false, | |
| + }, | |
| + { | |
| + "warm node", | |
| + []string{RoleWarm}, | |
| + false, false, false, false, true, | |
| + }, | |
| + { | |
| + "mixed roles", | |
| + []string{RoleData, RoleIngest, RoleClusterManager}, | |
| + true, true, true, false, false, | |
| + }, | |
| + { | |
| + "warm and data roles", | |
| + []string{RoleWarm, RoleData}, | |
| + false, true, false, false, true, | |
| + }, | |
| + { | |
| + "no roles", | |
| + []string{}, | |
| + false, false, false, false, false, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + rs := newRoleSet(tt.roles) | |
| + | |
| + // Check cluster manager eligibility | |
| + isClusterManagerEligible := rs.has(RoleMaster) || rs.has(RoleClusterManager) | |
| + assert.Equal(t, tt.expectClusterManager, isClusterManagerEligible) | |
| + assert.Equal(t, tt.expectData, rs.has(RoleData)) | |
| + assert.Equal(t, tt.expectIngest, rs.has(RoleIngest)) | |
| + assert.Equal(t, tt.expectSearch, rs.has(RoleSearch)) | |
| + assert.Equal(t, tt.expectWarm, rs.has(RoleWarm)) | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// TestValidateNodeRoles verifies role validation logic | |
| +func TestValidateNodeRoles(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + roles []string | |
| + nodeName string | |
| + wantError bool | |
| + errorMsg string | |
| + }{ | |
| + { | |
| + "valid cluster_manager only", | |
| + []string{RoleClusterManager}, | |
| + "test-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "valid data only", | |
| + []string{RoleData}, | |
| + "test-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "valid cluster_manager and data", | |
| + []string{RoleClusterManager, RoleData}, | |
| + "test-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "valid warm node only", | |
| + []string{RoleWarm}, | |
| + "warm-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "valid warm and data combination", | |
| + []string{RoleWarm, RoleData}, | |
| + "warm-data-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "deprecated master role - should log warning but not error", | |
| + []string{RoleMaster, RoleData}, | |
| + "test-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "deprecated search role - should log warning but not error", | |
| + []string{RoleSearch}, | |
| + "search-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "master and cluster_manager may both be set (master is just an alias)", | |
| + []string{RoleMaster, RoleClusterManager}, | |
| + "test-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "valid search with data role (OpenSearch allows combination)", | |
| + []string{RoleSearch, RoleData}, | |
| + "search-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "valid search only (backward compatibility)", | |
| + []string{RoleSearch}, | |
| + "search-node", | |
| + false, | |
| + "", | |
| + }, | |
| + { | |
| + "invalid warm and search combination (OpenSearch 3.0)", | |
| + []string{RoleWarm, RoleSearch}, | |
| + "hybrid-node", | |
| + true, | |
| + "cannot have both \"warm\" and \"search\" roles", | |
| + }, | |
| + { | |
| + "valid complex search combination (OpenSearch allows multi-role)", | |
| + []string{RoleSearch, RoleData, RoleIngest}, | |
| + "multi-role-node", | |
| + false, | |
| + "", | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + rs := newRoleSet(tt.roles) | |
| + err := rs.validate(tt.nodeName) | |
| + | |
| + if tt.wantError { | |
| + assert.Error(t, err) | |
| + assert.Contains(t, err.Error(), tt.errorMsg) | |
| + } else { | |
| + assert.NoError(t, err) | |
| + } | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// TestShouldSkipDedicatedClusterManagers verifies upstream-compatible node selection | |
| +func TestShouldSkipDedicatedClusterManagers(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + roles []string | |
| + shouldSkip bool | |
| + }{ | |
| + { | |
| + "cluster_manager only - should skip", | |
| + []string{RoleClusterManager}, | |
| + true, | |
| + }, | |
| + { | |
| + "master only - should skip (deprecated)", | |
| + []string{RoleMaster}, | |
| + true, | |
| + }, | |
| + { | |
| + "cluster_manager with data - should not skip", | |
| + []string{RoleClusterManager, RoleData}, | |
| + false, | |
| + }, | |
| + { | |
| + "cluster_manager with ingest - should not skip", | |
| + []string{RoleClusterManager, RoleIngest}, | |
| + false, | |
| + }, | |
| + { | |
| + "cluster_manager with warm - should not skip (OpenSearch 3.0 searchable snapshots)", | |
| + []string{RoleClusterManager, RoleWarm}, | |
| + false, | |
| + }, | |
| + { | |
| + "cluster_manager with data and ingest - should not skip", | |
| + []string{RoleClusterManager, RoleData, RoleIngest}, | |
| + false, | |
| + }, | |
| + { | |
| + "data only - should not skip", | |
| + []string{RoleData}, | |
| + false, | |
| + }, | |
| + { | |
| + "ingest only - should not skip", | |
| + []string{RoleIngest}, | |
| + false, | |
| + }, | |
| + { | |
| + "search only - should not skip", | |
| + []string{RoleSearch}, | |
| + false, | |
| + }, | |
| + { | |
| + "warm only - should not skip", | |
| + []string{RoleWarm}, | |
| + false, | |
| + }, | |
| + { | |
| + "warm and data - should not skip", | |
| + []string{RoleWarm, RoleData}, | |
| + false, | |
| + }, | |
| + { | |
| + "master with remote_cluster_client - should skip", | |
| + []string{RoleMaster, RoleRemoteClusterClient}, | |
| + true, | |
| + }, | |
| + { | |
| + "cluster_manager with remote_cluster_client - should skip", | |
| + []string{RoleClusterManager, RoleRemoteClusterClient}, | |
| + true, | |
| + }, | |
| + { | |
| + "no roles - should not skip", | |
| + []string{}, | |
| + false, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + rs := newRoleSet(tt.roles) | |
| + result := rs.isDedicatedClusterManager() | |
| + assert.Equal(t, tt.shouldSkip, result) | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// TestDiscoverNodesWithNewRoleValidation verifies the enhanced discovery behavior | |
| +func TestDiscoverNodesWithNewRoleValidation(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + nodes map[string][]string // nodeName -> roles | |
| + expectedNodes []string // nodes that should be included | |
| + expectedSkipped []string // nodes that should be skipped | |
| + }{ | |
| + { | |
| + "mixed node types with validation", | |
| + map[string][]string{ | |
| + "cm-only": {RoleClusterManager}, // should be skipped | |
| + "master-only": {RoleMaster}, // should be skipped | |
| + "data-node": {RoleData}, // should be included | |
| + "mixed-good": {RoleClusterManager, RoleData}, // should be included | |
| + "search-only": {RoleSearch}, // should be included | |
| + }, | |
| + []string{"data-node", "mixed-good", "search-only"}, | |
| + []string{"cm-only", "master-only"}, | |
| + }, | |
| + { | |
| + "OpenSearch 3.X compliant setup", | |
| + map[string][]string{ | |
| + "dedicated-cm": {RoleClusterManager}, // should be skipped | |
| + "data-hot": {RoleData, RoleIngest}, // should be included | |
| + "data-warm": {RoleWarm, RoleData}, // should be included | |
| + "search-node": {RoleSearch}, // should be included | |
| + "coordinating": {RoleRemoteClusterClient}, // should be included | |
| + }, | |
| + []string{"data-hot", "data-warm", "search-node", "coordinating"}, | |
| + []string{"dedicated-cm"}, | |
| + }, | |
| + { | |
| + "cluster manager and remote cluster client filtering", | |
| + map[string][]string{ | |
| + "cm-rcc": {RoleClusterManager, RoleRemoteClusterClient}, // should be skipped | |
| + "cm-data": {RoleClusterManager, RoleData}, // should be included | |
| + "rcc-only": {RoleRemoteClusterClient}, // should be included | |
| + "data-node": {RoleData}, // should be included | |
| + }, | |
| + []string{"cm-data", "rcc-only", "data-node"}, | |
| + []string{"cm-rcc"}, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + // Create mock transport that returns our test nodes | |
| + newRoundTripper := func() http.RoundTripper { | |
| + return &mockTransp{ | |
| + RoundTripFunc: func(req *http.Request) (*http.Response, error) { | |
| + nodes := make(map[string]map[string]nodeInfo) | |
| + nodes["nodes"] = make(map[string]nodeInfo) | |
| + | |
| + for name, roles := range tt.nodes { | |
| + nodes["nodes"][name] = nodeInfo{ | |
| + ID: name + "-id", | |
| + Name: name, | |
| + Roles: roles, | |
| + } | |
| + } | |
| + | |
| + b, _ := json.Marshal(nodes) | |
| + return &http.Response{ | |
| + Status: fmt.Sprintf("%d %s", http.StatusOK, http.StatusText(http.StatusOK)), | |
| + StatusCode: http.StatusOK, | |
| + ContentLength: int64(len(b)), | |
| + Header: http.Header(map[string][]string{"Content-Type": {"application/json"}}), | |
| + Body: io.NopCloser(bytes.NewReader(b)), | |
| + }, nil | |
| + }, | |
| + } | |
| + } | |
| + | |
| + u, _ := url.Parse("http://localhost:9200") | |
| + c, err := New(Config{ | |
| + URLs: []*url.URL{u}, | |
| + Transport: newRoundTripper(), | |
| + }) | |
| + require.NoError(t, err) | |
| + | |
| + // Perform discovery | |
| + err = c.DiscoverNodes() | |
| + assert.NoError(t, err) | |
| + | |
| + // Verify results | |
| + pool, ok := c.mu.pool.(*statusConnectionPool) | |
| + require.True(t, ok, "Expected statusConnectionPool") | |
| + | |
| + // Check that expected nodes are included | |
| + actualNodes := make(map[string]bool) | |
| + for _, conn := range pool.mu.live { | |
| + actualNodes[conn.Name] = true | |
| + } | |
| + | |
| + assert.Equal(t, len(tt.expectedNodes), len(actualNodes), | |
| + "Expected %d nodes but got %d: %v", len(tt.expectedNodes), len(actualNodes), actualNodes) | |
| + | |
| + for _, expectedNode := range tt.expectedNodes { | |
| + assert.True(t, actualNodes[expectedNode], | |
| + "Expected node %q to be included but it wasn't", expectedNode) | |
| + } | |
| + | |
| + for _, skippedNode := range tt.expectedSkipped { | |
| + assert.False(t, actualNodes[skippedNode], | |
| + "Expected node %q to be skipped but it was included", skippedNode) | |
| + } | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// TestIncludeDedicatedClusterManagersConfiguration verifies the configurable behavior | |
| +func TestIncludeDedicatedClusterManagersConfiguration(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + includeDedicatedClusterManagers bool | |
| + nodes map[string][]string // nodeName -> roles | |
| + expectedIncluded []string // nodes that should be included | |
| + expectedExcluded []string // nodes that should be excluded | |
| + }{ | |
| + { | |
| + name: "IncludeDedicatedClusterManagers enabled - includes all nodes", | |
| + includeDedicatedClusterManagers: true, | |
| + nodes: map[string][]string{ | |
| + "cm-only": {RoleClusterManager}, | |
| + "data-node": {RoleData}, | |
| + }, | |
| + expectedIncluded: []string{"cm-only", "data-node"}, | |
| + expectedExcluded: []string{}, | |
| + }, | |
| + { | |
| + name: "IncludeDedicatedClusterManagers disabled (default) - excludes dedicated CM nodes", | |
| + includeDedicatedClusterManagers: false, | |
| + nodes: map[string][]string{ | |
| + "cm-only": {RoleClusterManager}, | |
| + "data-node": {RoleData}, | |
| + "dummy": {RoleData}, // Add second node to avoid single connection pool | |
| + }, | |
| + expectedIncluded: []string{"data-node", "dummy"}, | |
| + expectedExcluded: []string{"cm-only"}, | |
| + }, | |
| + { | |
| + name: "Mixed roles with CM always included regardless of setting", | |
| + includeDedicatedClusterManagers: false, | |
| + nodes: map[string][]string{ | |
| + "cm-data": {RoleClusterManager, RoleData}, | |
| + "dummy": {RoleData}, // Add second node to avoid single connection pool | |
| + }, | |
| + expectedIncluded: []string{"cm-data", "dummy"}, | |
| + expectedExcluded: []string{}, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + // Create mock transport | |
| + newRoundTripper := func() http.RoundTripper { | |
| + return &mockTransp{ | |
| + RoundTripFunc: func(req *http.Request) (*http.Response, error) { | |
| + nodes := make(map[string]map[string]nodeInfo) | |
| + nodes["nodes"] = make(map[string]nodeInfo) | |
| + | |
| + for name, roles := range tt.nodes { | |
| + nodes["nodes"][name] = nodeInfo{ | |
| + ID: name + "-id", | |
| + Name: name, | |
| + Roles: roles, | |
| + } | |
| + } | |
| + | |
| + b, _ := json.Marshal(nodes) | |
| + return &http.Response{ | |
| + Status: fmt.Sprintf("%d %s", http.StatusOK, http.StatusText(http.StatusOK)), | |
| + StatusCode: http.StatusOK, | |
| + ContentLength: int64(len(b)), | |
| + Header: http.Header(map[string][]string{"Content-Type": {"application/json"}}), | |
| + Body: io.NopCloser(bytes.NewReader(b)), | |
| + }, nil | |
| + }, | |
| + } | |
| + } | |
| + | |
| + u, _ := url.Parse("http://localhost:9200") | |
| + c, err := New(Config{ | |
| + URLs: []*url.URL{u}, | |
| + Transport: newRoundTripper(), | |
| + IncludeDedicatedClusterManagers: tt.includeDedicatedClusterManagers, | |
| + }) | |
| + require.NoError(t, err) | |
| + | |
| + // Perform discovery | |
| + err = c.DiscoverNodes() | |
| + assert.NoError(t, err) | |
| + | |
| + // Verify results | |
| + pool, ok := c.mu.pool.(*statusConnectionPool) | |
| + require.True(t, ok, "Expected statusConnectionPool") | |
| + | |
| + // Check included nodes | |
| + actualNodes := make(map[string]bool) | |
| + for _, conn := range pool.mu.live { | |
| + actualNodes[conn.Name] = true | |
| + } | |
| + | |
| + for _, expectedNode := range tt.expectedIncluded { | |
| + assert.True(t, actualNodes[expectedNode], | |
| + "Expected node %q to be included but it wasn't", expectedNode) | |
| + } | |
| + | |
| + for _, excludedNode := range tt.expectedExcluded { | |
| + assert.False(t, actualNodes[excludedNode], | |
| + "Expected node %q to be excluded but it was included", excludedNode) | |
| + } | |
| + | |
| + // Verify total count | |
| + expectedTotal := len(tt.expectedIncluded) | |
| + assert.Equal(t, expectedTotal, len(actualNodes), | |
| + "Expected %d nodes but got %d", expectedTotal, len(actualNodes)) | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// TestGenericRoleBasedSelector tests the new generic role-based selector | |
| +func TestGenericRoleBasedSelector(t *testing.T) { | |
| + connections := []*Connection{ | |
| + {Name: "data-node", Roles: []string{RoleData}}, | |
| + {Name: "ingest-node", Roles: []string{RoleIngest}}, | |
| + {Name: "data-ingest-node", Roles: []string{RoleData, RoleIngest}}, | |
| + {Name: "cluster-manager-node", Roles: []string{RoleClusterManager}}, | |
| + {Name: "coordinating-node", Roles: []string{}}, // No specific roles | |
| + } | |
| + | |
| + fallback := &mockSelector{} | |
| + | |
| + t.Run("Generic selector with required roles", func(t *testing.T) { | |
| + // Create a selector that requires either data or ingest roles | |
| + selector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleData, RoleIngest), | |
| + ) | |
| + | |
| + conn, err := selector.Select(connections) | |
| + assert.NoError(t, err) | |
| + // Should match one of: data-node, ingest-node, or data-ingest-node | |
| + assert.Contains(t, []string{"data-node", "ingest-node", "data-ingest-node"}, conn.Name) | |
| + }) | |
| + | |
| + t.Run("Generic selector with excluded roles", func(t *testing.T) { | |
| + // Create a selector that excludes cluster manager roles | |
| + selector := NewRoleBasedSelector( | |
| + WithExcludedRoles(RoleClusterManager), | |
| + ) | |
| + | |
| + conn, err := selector.Select(connections) | |
| + assert.NoError(t, err) | |
| + // Should NOT be the cluster-manager-node | |
| + assert.NotEqual(t, "cluster-manager-node", conn.Name) | |
| + }) | |
| + | |
| + t.Run("Generic selector strict mode", func(t *testing.T) { | |
| + // Create a strict selector that only allows warm nodes | |
| + selector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleWarm), | |
| + WithStrictMode(), | |
| + ) | |
| + | |
| + conn, err := selector.Select(connections) | |
| + assert.Error(t, err) | |
| + assert.Nil(t, conn) | |
| + assert.Contains(t, err.Error(), "no connections found matching required roles") | |
| + }) | |
| + | |
| + t.Run("Options pattern flexibility", func(t *testing.T) { | |
| + // Test that options pattern allows flexible configuration | |
| + ingestSelector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleIngest), | |
| + WithFallback(fallback), | |
| + ) | |
| + | |
| + strictIngestSelector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleWarm), // Try warm nodes (which don't exist) | |
| + WithStrictMode(), | |
| + ) | |
| + | |
| + conn1, err1 := ingestSelector.Select(connections) | |
| + conn2, err2 := strictIngestSelector.Select(connections) | |
| + | |
| + assert.NoError(t, err1) | |
| + assert.Error(t, err2) // Strict mode should fail with no warm nodes | |
| + assert.Nil(t, conn2) | |
| + // Fallback selector should return ingest-capable nodes | |
| + assert.Contains(t, []string{"ingest-node", "data-ingest-node"}, conn1.Name) | |
| + }) | |
| +} | |
| + | |
| +// TestRoleBasedSelectors tests the role-based selector with various configurations | |
| +func TestRoleBasedSelectors(t *testing.T) { | |
| + // Create test connections with different roles | |
| + connections := []*Connection{ | |
| + {Name: "data-node", Roles: []string{RoleData}}, | |
| + {Name: "ingest-node", Roles: []string{RoleIngest}}, | |
| + {Name: "data-ingest-node", Roles: []string{RoleData, RoleIngest}}, | |
| + {Name: "cluster-manager-node", Roles: []string{RoleClusterManager}}, | |
| + {Name: "warm-node", Roles: []string{RoleWarm}}, | |
| + {Name: "search-node", Roles: []string{RoleSearch}}, | |
| + {Name: "coordinating-node", Roles: []string{}}, // No specific roles | |
| + } | |
| + | |
| + // Mock fallback selector that just returns the first connection | |
| + fallback := &mockSelector{} | |
| + | |
| + t.Run("IngestPreferred", func(t *testing.T) { | |
| + selector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleIngest), | |
| + WithFallback(fallback), | |
| + ) | |
| + | |
| + // Should prefer ingest nodes | |
| + conn, err := selector.Select(connections) | |
| + assert.NoError(t, err) | |
| + // Should get either "ingest-node" or "data-ingest-node" | |
| + assert.Contains(t, []string{"ingest-node", "data-ingest-node"}, conn.Name) | |
| + | |
| + // Should fall back when no ingest nodes available | |
| + dataOnlyConns := []*Connection{connections[0], connections[3]} // data and cluster-manager | |
| + conn, err = selector.Select(dataOnlyConns) | |
| + assert.NoError(t, err) | |
| + assert.Equal(t, "data-node", conn.Name) // Fallback should work | |
| + }) | |
| + | |
| + t.Run("DataPreferred", func(t *testing.T) { | |
| + selector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleData), | |
| + WithFallback(fallback), | |
| + ) | |
| + | |
| + conn, err := selector.Select(connections) | |
| + assert.NoError(t, err) | |
| + // Should get a data-capable node | |
| + assert.Contains(t, []string{"data-node", "data-ingest-node"}, conn.Name) | |
| + }) | |
| + | |
| + t.Run("WarmPreferred", func(t *testing.T) { | |
| + selector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleWarm), | |
| + WithFallback(fallback), | |
| + ) | |
| + | |
| + conn, err := selector.Select(connections) | |
| + assert.NoError(t, err) | |
| + assert.Equal(t, "warm-node", conn.Name) | |
| + | |
| + // Should fall back when no warm nodes available | |
| + noWarmConns := []*Connection{connections[0], connections[1]} // data and ingest | |
| + conn, err = selector.Select(noWarmConns) | |
| + assert.NoError(t, err) | |
| + assert.Equal(t, "data-node", conn.Name) // Fallback should work | |
| + }) | |
| + | |
| + t.Run("IngestOnly", func(t *testing.T) { | |
| + selector := NewRoleBasedSelector( | |
| + WithRequiredRoles(RoleIngest), | |
| + WithStrictMode(), | |
| + ) | |
| + | |
| + // Should work when ingest nodes are available | |
| + conn, err := selector.Select(connections) | |
| + assert.NoError(t, err) | |
| + assert.Contains(t, []string{"ingest-node", "data-ingest-node"}, conn.Name) | |
| + | |
| + // Should fail when no ingest nodes available (strict mode) | |
| + dataOnlyConns := []*Connection{connections[0], connections[3]} // data and cluster-manager | |
| + conn, err = selector.Select(dataOnlyConns) | |
| + assert.Error(t, err) | |
| + assert.Nil(t, conn) | |
| + assert.Contains(t, err.Error(), "no connections found matching required roles") | |
| + }) | |
| +} | |
| + | |
| +// TestSmartSelector tests the request-aware smart selector | |
| +func TestSmartSelector(t *testing.T) { | |
| + // Create test connections | |
| + connections := []*Connection{ | |
| + {Name: "data-node", Roles: []string{RoleData}}, | |
| + {Name: "ingest-node", Roles: []string{RoleIngest}}, | |
| + {Name: "data-ingest-node", Roles: []string{RoleData, RoleIngest}}, | |
| + } | |
| + | |
| + fallback := &mockSelector{} | |
| + selector := NewSmartSelector(fallback) | |
| + | |
| + t.Run("IngestOperationRouting", func(t *testing.T) { | |
| + req := &mockRequest{ | |
| + method: "POST", | |
| + path: "/my-index/_bulk", | |
| + } | |
| + | |
| + conn, err := selector.SelectForRequest(connections, req) | |
| + assert.NoError(t, err) | |
| + // Should route to ingest-capable node for bulk operations | |
| + assert.Contains(t, []string{"ingest-node", "data-ingest-node"}, conn.Name) | |
| + }) | |
| + | |
| + t.Run("SearchOperationRouting", func(t *testing.T) { | |
| + req := &mockRequest{ | |
| + method: "POST", | |
| + path: "/my-index/_search", | |
| + } | |
| + | |
| + conn, err := selector.SelectForRequest(connections, req) | |
| + assert.NoError(t, err) | |
| + // Should route to data-capable node for search operations | |
| + assert.Contains(t, []string{"data-node", "data-ingest-node"}, conn.Name) | |
| + }) | |
| + | |
| + t.Run("DefaultOperationRouting", func(t *testing.T) { | |
| + req := &mockRequest{ | |
| + method: "GET", | |
| + path: "/_cluster/health", | |
| + } | |
| + | |
| + conn, err := selector.SelectForRequest(connections, req) | |
| + assert.NoError(t, err) | |
| + // Should use default routing | |
| + assert.Equal(t, "data-node", conn.Name) // Mock selector returns first connection | |
| + }) | |
| + | |
| + t.Run("BasicSelectorInterface", func(t *testing.T) { | |
| + // Test that SmartSelector implements basic Selector interface | |
| + conn, err := selector.Select(connections) | |
| + assert.NoError(t, err) | |
| + assert.Equal(t, "data-node", conn.Name) // Should use default selector | |
| + }) | |
| +} | |
| + | |
| +// TestRequestAwareConnectionPool tests the enhanced connection pool | |
| +func TestRequestAwareConnectionPool(t *testing.T) { | |
| + connections := []*Connection{ | |
| + {Name: "data-node", Roles: []string{RoleData}}, | |
| + {Name: "ingest-node", Roles: []string{RoleIngest}}, | |
| + } | |
| + | |
| + smartSelector := NewSmartSelector(&mockSelector{}) | |
| + pool := NewConnectionPool(connections, smartSelector) | |
| + | |
| + // Verify it implements both interfaces | |
| + _, ok := pool.(ConnectionPool) | |
| + assert.True(t, ok, "Should implement ConnectionPool") | |
| + | |
| + racp, ok := pool.(RequestAwareConnectionPool) | |
| + assert.True(t, ok, "Should implement RequestAwareConnectionPool") | |
| + | |
| + t.Run("NextForRequest", func(t *testing.T) { | |
| + req := &mockRequest{ | |
| + method: "POST", | |
| + path: "/my-index/_bulk", | |
| + } | |
| + | |
| + conn, err := racp.NextForRequest(req) | |
| + assert.NoError(t, err) | |
| + assert.Equal(t, "ingest-node", conn.Name) // Should route to ingest node for bulk | |
| + }) | |
| + | |
| + t.Run("BackwardCompatibilityNext", func(t *testing.T) { | |
| + conn, err := pool.Next() | |
| + assert.NoError(t, err) | |
| + assert.Equal(t, "data-node", conn.Name) // Default behavior | |
| + }) | |
| +} | |
| + | |
| +// TestOperationDetection tests the operation detection logic | |
| +func TestOperationDetection(t *testing.T) { | |
| + tests := []struct { | |
| + method string | |
| + path string | |
| + isIngest bool | |
| + isSearch bool | |
| + }{ | |
| + // Ingest operations | |
| + {"POST", "/my-index/_bulk", true, false}, | |
| + {"PUT", "/_ingest/pipeline/my-pipeline", true, false}, | |
| + {"POST", "/_ingest/pipeline/_simulate", true, false}, | |
| + | |
| + // Search operations | |
| + {"GET", "/my-index/_search", false, true}, | |
| + {"POST", "/my-index/_search", false, true}, | |
| + {"GET", "/_search", false, true}, | |
| + {"POST", "/_msearch", false, true}, | |
| + {"GET", "/my-index/_doc/1", false, true}, | |
| + | |
| + // Other operations | |
| + {"GET", "/_cluster/health", false, false}, | |
| + {"PUT", "/my-index", false, false}, | |
| + {"DELETE", "/my-index", false, false}, | |
| + {"GET", "/_cat/indices", false, false}, | |
| + } | |
| + | |
| + for _, test := range tests { | |
| + t.Run(test.method+"_"+test.path, func(t *testing.T) { | |
| + assert.Equal(t, test.isIngest, isIngestOperation(test.path, test.method)) | |
| + assert.Equal(t, test.isSearch, isSearchOperation(test.path, test.method)) | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// Mock implementations for testing | |
| + | |
| +type mockSelector struct{} | |
| + | |
| +func (s *mockSelector) Select(connections []*Connection) (*Connection, error) { | |
| + if len(connections) == 0 { | |
| + return nil, errors.New("no connections") | |
| + } | |
| + return connections[0], nil // Always return first connection | |
| +} | |
| + | |
| +type mockRequest struct { | |
| + method string | |
| + path string | |
| + headers map[string]string | |
| +} | |
| + | |
| +func (r *mockRequest) GetMethod() string { return r.method } | |
| +func (r *mockRequest) GetPath() string { return r.path } | |
| +func (r *mockRequest) GetHeaders() map[string]string { | |
| + if r.headers == nil { | |
| + return make(map[string]string) | |
| + } | |
| + return r.headers | |
| +} | |
| + | |
| +// TestConnectionPreservationAcrossDiscovery verifies that connections are properly preserved during discovery | |
| +func TestConnectionPreservationAcrossDiscovery(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + initialNodes map[string][]string // nodeName -> roles | |
| + discoveredNodes map[string][]string // nodeName -> roles after discovery | |
| + expectedPreserved []string // connections that should be preserved | |
| + expectedNew []string // new connections that should be added | |
| + expectedDiscarded []string // connections that should be removed | |
| + simulateDeadNodes []string // nodes to mark as dead before discovery | |
| + }{ | |
| + { | |
| + name: "preserve live connections for existing nodes", | |
| + initialNodes: map[string][]string{ | |
| + "node1": {RoleData}, | |
| + "node2": {RoleIngest}, | |
| + "node3": {RoleClusterManager}, | |
| + }, | |
| + discoveredNodes: map[string][]string{ | |
| + "node1": {RoleData}, | |
| + "node2": {RoleIngest}, | |
| + "node4": {RoleData}, // new node | |
| + }, | |
| + expectedPreserved: []string{"node1", "node2"}, | |
| + expectedNew: []string{"node4"}, | |
| + expectedDiscarded: []string{"node3"}, | |
| + }, | |
| + { | |
| + name: "preserve dead connections with resurrection timers", | |
| + initialNodes: map[string][]string{ | |
| + "node1": {RoleData}, | |
| + "node2": {RoleIngest}, | |
| + }, | |
| + discoveredNodes: map[string][]string{ | |
| + "node1": {RoleData}, | |
| + "node2": {RoleIngest}, | |
| + }, | |
| + expectedPreserved: []string{"node1", "node2"}, | |
| + expectedNew: []string{}, | |
| + expectedDiscarded: []string{}, | |
| + simulateDeadNodes: []string{"node2"}, // node2 should remain dead but preserved | |
| + }, | |
| + { | |
| + name: "remove connections for nodes no longer in cluster", | |
| + initialNodes: map[string][]string{ | |
| + "node1": {RoleData}, | |
| + "node2": {RoleIngest}, | |
| + "node3": {RoleClusterManager, RoleData}, | |
| + }, | |
| + discoveredNodes: map[string][]string{ | |
| + "node1": {RoleData}, // only node1 remains | |
| + }, | |
| + expectedPreserved: []string{"node1"}, | |
| + expectedNew: []string{}, | |
| + expectedDiscarded: []string{"node2", "node3"}, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + // Create initial connections | |
| + var initialConns []*Connection | |
| + for name, roles := range tt.initialNodes { | |
| + u, _ := url.Parse(fmt.Sprintf("http://%s:9200", name)) | |
| + conn := &Connection{ | |
| + URL: u, | |
| + Name: name, | |
| + Roles: roles, | |
| + } | |
| + initialConns = append(initialConns, conn) | |
| + } | |
| + | |
| + // Create initial pool | |
| + pool := newStatusConnectionPool(initialConns, []*Connection{}, NewRoundRobinSelector()) | |
| + | |
| + // Simulate dead nodes | |
| + for _, deadNodeName := range tt.simulateDeadNodes { | |
| + for _, conn := range pool.mu.live { | |
| + if conn.Name == deadNodeName { | |
| + conn.markAsDead() | |
| + // Move to dead list manually for test | |
| + pool.mu.dead = append(pool.mu.dead, conn) | |
| + // Remove from live list | |
| + for i, liveConn := range pool.mu.live { | |
| + if liveConn == conn { | |
| + pool.mu.live = append(pool.mu.live[:i], pool.mu.live[i+1:]...) | |
| + break | |
| + } | |
| + } | |
| + break | |
| + } | |
| + } | |
| + } | |
| + | |
| + // Create client with the initial pool | |
| + client := &Client{ | |
| + selector: NewRoundRobinSelector(), | |
| + } | |
| + client.mu.pool = pool | |
| + | |
| + // Simulate discovered connections | |
| + var discoveredConns []*Connection | |
| + for name, roles := range tt.discoveredNodes { | |
| + u, _ := url.Parse(fmt.Sprintf("http://%s:9200", name)) | |
| + conn := &Connection{ | |
| + URL: u, | |
| + Name: name, | |
| + Roles: roles, | |
| + } | |
| + discoveredConns = append(discoveredConns, conn) | |
| + } | |
| + | |
| + // Test the updateConnPool method | |
| + newPool := client.updateConnPool(discoveredConns, client.selector) | |
| + statusPool, ok := newPool.(*statusConnectionPool) | |
| + require.True(t, ok, "Expected statusConnectionPool") | |
| + | |
| + // Verify preserved connections by checking object identity | |
| + allConnections := statusPool.connections() | |
| + actualPreserved := make(map[string]struct{}) | |
| + actualNew := make(map[string]struct{}) | |
| + | |
| + for _, conn := range allConnections { | |
| + // Check if this was an original connection by object identity | |
| + wasPreserved := false | |
| + for _, orig := range initialConns { | |
| + if orig == conn { // Same object reference | |
| + actualPreserved[conn.Name] = struct{}{} | |
| + wasPreserved = true | |
| + break | |
| + } | |
| + } | |
| + | |
| + if !wasPreserved { | |
| + actualNew[conn.Name] = struct{}{} | |
| + } | |
| + } | |
| + | |
| + // Verify expected preserved connections | |
| + for _, expectedName := range tt.expectedPreserved { | |
| + _, exists := actualPreserved[expectedName] | |
| + assert.True(t, exists, | |
| + "Expected connection %q to be preserved", expectedName) | |
| + } | |
| + | |
| + // Verify expected new connections | |
| + for _, expectedName := range tt.expectedNew { | |
| + _, exists := actualNew[expectedName] | |
| + assert.True(t, exists, | |
| + "Expected connection %q to be new", expectedName) | |
| + } | |
| + | |
| + // Verify discarded connections are not present | |
| + for _, discardedName := range tt.expectedDiscarded { | |
| + found := false | |
| + for _, conn := range allConnections { | |
| + if conn.Name == discardedName { | |
| + found = true | |
| + break | |
| + } | |
| + } | |
| + assert.False(t, found, | |
| + "Expected connection %q to be discarded", discardedName) | |
| + } | |
| + | |
| + // Verify dead connections maintain their status if preserved | |
| + for _, deadNodeName := range tt.simulateDeadNodes { | |
| + if slices.Contains(tt.expectedPreserved, deadNodeName) { | |
| + found := false | |
| + for _, conn := range statusPool.mu.dead { | |
| + if conn.Name == deadNodeName { | |
| + conn.mu.Lock() | |
| + isDead := conn.mu.isDead | |
| + conn.mu.Unlock() | |
| + assert.True(t, isDead, | |
| + "Expected preserved dead connection %q to remain dead", deadNodeName) | |
| + found = true | |
| + break | |
| + } | |
| + } | |
| + assert.True(t, found, | |
| + "Expected dead connection %q to be in dead list", deadNodeName) | |
| + } | |
| + } | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// TestConnectionPreservationResurrectionTimers tests that resurrection timers are maintained | |
| +func TestConnectionPreservationResurrectionTimers(t *testing.T) { | |
| + // Create a connection and mark it dead | |
| + u, _ := url.Parse("http://node1:9200") | |
| + conn := &Connection{ | |
| + URL: u, | |
| + Name: "node1", | |
| + Roles: []string{RoleData}, | |
| + } | |
| + conn.mu.isDead = true | |
| + conn.mu.deadSince = time.Now().UTC().Add(-30 * time.Second) // Dead for 30 seconds | |
| + conn.failures.Store(3) | |
| + | |
| + // Create pool with dead connection | |
| + pool := newStatusConnectionPool([]*Connection{}, []*Connection{conn}, NewRoundRobinSelector()) | |
| + | |
| + client := &Client{ | |
| + selector: NewRoundRobinSelector(), | |
| + } | |
| + client.mu.pool = pool | |
| + | |
| + // Simulate discovery that finds the same node | |
| + discoveredConns := []*Connection{ | |
| + { | |
| + URL: u, | |
| + Name: "node1", | |
| + Roles: []string{RoleData}, | |
| + }, | |
| + } | |
| + | |
| + // Update the pool | |
| + newPool := client.updateConnPool(discoveredConns, client.selector) | |
| + statusPool, ok := newPool.(*statusConnectionPool) | |
| + require.True(t, ok) | |
| + | |
| + // Verify the dead connection was preserved with its state | |
| + require.Len(t, statusPool.mu.dead, 1, "Should have preserved the dead connection") | |
| + preservedConn := statusPool.mu.dead[0] | |
| + | |
| + assert.Equal(t, conn, preservedConn, "Should be the exact same connection object") | |
| + | |
| + // Get original connection's deadSince for comparison | |
| + conn.mu.Lock() | |
| + origDeadSince := conn.mu.deadSince | |
| + conn.mu.Unlock() | |
| + | |
| + preservedConn.mu.Lock() | |
| + isDead := preservedConn.mu.isDead | |
| + deadSince := preservedConn.mu.deadSince | |
| + preservedConn.mu.Unlock() | |
| + | |
| + assert.True(t, isDead, "Connection should remain dead") | |
| + assert.Equal(t, int64(3), preservedConn.failures.Load(), "Failure count should be preserved") | |
| + assert.WithinDuration(t, origDeadSince, deadSince, time.Second, | |
| + "DeadSince timestamp should be preserved") | |
| +} | |
| + | |
| +func TestDiscoveryTimeoutBehavior(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + discoveryTimeout time.Duration | |
| + description string | |
| + }{ | |
| + { | |
| + name: "default timeout behavior", | |
| + discoveryTimeout: 0, // Uses default 30s timeout | |
| + description: "Should use default 30s timeout when not configured", | |
| + }, | |
| + { | |
| + name: "custom short timeout", | |
| + discoveryTimeout: 100 * time.Millisecond, | |
| + description: "Should use custom short timeout", | |
| + }, | |
| + { | |
| + name: "no timeout behavior", | |
| + discoveryTimeout: -1 * time.Second, // Disables timeout | |
| + description: "Should have no timeout when negative value is used", | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + // Create a slow mock transport to test timeout behavior | |
| + slowTransport := &slowMockTransport{ | |
| + delay: 200 * time.Millisecond, // Always takes 200ms | |
| + } | |
| + | |
| + urls := []*url.URL{ | |
| + {Scheme: "http", Host: "localhost:9200"}, | |
| + } | |
| + | |
| + cfg := Config{ | |
| + URLs: urls, | |
| + Transport: slowTransport, | |
| + DiscoveryTimeout: tt.discoveryTimeout, | |
| + } | |
| + | |
| + client, err := New(cfg) | |
| + require.NoError(t, err, "Should create client successfully") | |
| + | |
| + // Test the timeout behavior | |
| + start := time.Now() | |
| + err = client.DiscoverNodes() | |
| + elapsed := time.Since(start) | |
| + | |
| + switch { | |
| + case tt.discoveryTimeout == 0: | |
| + // Default timeout (30s) - should timeout after 30s, but our mock only takes 200ms | |
| + // So it should complete successfully | |
| + assert.NoError(t, err, "Discovery should complete with default timeout") | |
| + assert.Less(t, elapsed, 1*time.Second, "Should complete quickly with default timeout") | |
| + | |
| + case tt.discoveryTimeout > 0 && tt.discoveryTimeout < 200*time.Millisecond: | |
| + // Short timeout - should timeout before completion | |
| + assert.Error(t, err, "Discovery should timeout with short timeout") | |
| + assert.Contains(t, err.Error(), "context deadline exceeded", "Should be a timeout error") | |
| + | |
| + case tt.discoveryTimeout < 0: | |
| + // No timeout - should complete successfully | |
| + assert.NoError(t, err, "Discovery should complete without timeout") | |
| + assert.GreaterOrEqual(t, elapsed, 200*time.Millisecond, "Should take at least as long as mock delay") | |
| + | |
| + default: | |
| + // Long enough timeout - should complete successfully | |
| + assert.NoError(t, err, "Discovery should complete with sufficient timeout") | |
| + } | |
| + }) | |
| + } | |
| +} | |
| + | |
| +// slowMockTransport simulates a slow network by adding delay to requests | |
| +type slowMockTransport struct { | |
| + delay time.Duration | |
| +} | |
| + | |
| +func (t *slowMockTransport) RoundTrip(req *http.Request) (*http.Response, error) { | |
| + // Check for context cancellation during delay | |
| + ctx := req.Context() | |
| + timer := time.NewTimer(t.delay) | |
| + defer timer.Stop() | |
| + | |
| + select { | |
| + case <-ctx.Done(): | |
| + return nil, ctx.Err() | |
| + case <-timer.C: | |
| + // Delay completed, proceed with response | |
| + } | |
| + | |
| + // Return a valid nodes response | |
| + nodesResponse := map[string]interface{}{ | |
| + "nodes": map[string]interface{}{ | |
| + "node1": map[string]interface{}{ | |
| + "name": "test-node-1", | |
| + "roles": []string{"data", "ingest"}, | |
| + "http": map[string]interface{}{ | |
| + "publish_address": "127.0.0.1:9200", | |
| + }, | |
| + }, | |
| + }, | |
| + } | |
| + | |
| + body, _ := json.Marshal(nodesResponse) | |
| + return &http.Response{ | |
| + StatusCode: 200, | |
| + Body: io.NopCloser(bytes.NewReader(body)), | |
| + Header: make(http.Header), | |
| + }, nil | |
| +} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/metrics.go b/opensearchtransport/metrics.go | |
| index 442d04a..3c5b94f 100644 | |
| --- a/opensearchtransport/metrics.go | |
| +++ b/opensearchtransport/metrics.go | |
| @@ -32,6 +32,7 @@ import ( | |
| "strconv" | |
| "strings" | |
| "sync" | |
| + "sync/atomic" | |
| "time" | |
| ) | |
| @@ -70,11 +71,20 @@ type ConnectionMetric struct { | |
| // metrics represents the inner state of metrics. | |
| type metrics struct { | |
| - sync.RWMutex | |
| + requests atomic.Int64 | |
| + failures atomic.Int64 | |
| - requests int | |
| - failures int | |
| - responses map[int]int | |
| + mu struct { | |
| + sync.RWMutex | |
| + responses map[int]int | |
| + } | |
| +} | |
| + | |
| +// incrementResponse increments the counter for the given status code. | |
| +func (m *metrics) incrementResponse(statusCode int) { | |
| + m.mu.Lock() | |
| + m.mu.responses[statusCode]++ | |
| + m.mu.Unlock() | |
| } | |
| // Metrics returns the transport metrics. | |
| @@ -82,33 +92,45 @@ func (c *Client) Metrics() (Metrics, error) { | |
| if c.metrics == nil { | |
| return Metrics{}, errors.New("transport metrics not enabled") | |
| } | |
| - c.metrics.RLock() | |
| - defer c.metrics.RUnlock() | |
| - if lockable, ok := c.pool.(sync.Locker); ok { | |
| - lockable.Lock() | |
| - defer lockable.Unlock() | |
| + // Acquire locks in proper hierarchy order to prevent deadlock: | |
| + // 1. First acquire pool lock (higher in hierarchy) - use RLock since we're only reading | |
| + if rwLock, ok := c.mu.pool.(rwLocker); ok { | |
| + rwLock.RLock() | |
| + defer rwLock.RUnlock() | |
| + } | |
| + | |
| + // 2. Then acquire metrics lock (lower in hierarchy) - RLock for reading | |
| + c.metrics.mu.RLock() | |
| + responses := make(map[int]int, len(c.metrics.mu.responses)) | |
| + for statusCode, count := range c.metrics.mu.responses { | |
| + responses[statusCode] = count | |
| } | |
| + c.metrics.mu.RUnlock() | |
| m := Metrics{ | |
| - Requests: c.metrics.requests, | |
| - Failures: c.metrics.failures, | |
| - Responses: c.metrics.responses, | |
| + Requests: int(c.metrics.requests.Load()), | |
| + Failures: int(c.metrics.failures.Load()), | |
| + Responses: responses, | |
| } | |
| - if pool, ok := c.pool.(connectionable); ok { | |
| + if pool, ok := c.mu.pool.(connectionable); ok { | |
| connections := pool.connections() | |
| - for idx, c := range connections { | |
| - c.Lock() | |
| + for _, c := range connections { | |
| + c.mu.RLock() | |
| + isDead := c.mu.isDead | |
| + deadSince := c.mu.deadSince | |
| + c.mu.RUnlock() | |
| cm := ConnectionMetric{ | |
| URL: c.URL.String(), | |
| - IsDead: c.IsDead, | |
| - Failures: c.Failures, | |
| + IsDead: isDead, | |
| + Failures: int(c.failures.Load()), | |
| } | |
| - if !c.DeadSince.IsZero() { | |
| - cm.DeadSince = &connections[idx].DeadSince | |
| + if !deadSince.IsZero() { | |
| + deadSinceCopy := deadSince | |
| + cm.DeadSince = &deadSinceCopy | |
| } | |
| if c.ID != "" { | |
| @@ -124,7 +146,6 @@ func (c *Client) Metrics() (Metrics, error) { | |
| } | |
| m.Connections = append(m.Connections, cm) | |
| - c.Unlock() | |
| } | |
| } | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/metrics_internal_test.go b/opensearchtransport/metrics_internal_test.go | |
| index d0265e5..f7f4eca 100644 | |
| --- a/opensearchtransport/metrics_internal_test.go | |
| +++ b/opensearchtransport/metrics_internal_test.go | |
| @@ -51,10 +51,10 @@ func TestMetrics(t *testing.T) { | |
| }, | |
| ) | |
| - tp.metrics.requests = 3 | |
| - tp.metrics.failures = 4 | |
| - tp.metrics.responses[200] = 1 | |
| - tp.metrics.responses[404] = 2 | |
| + tp.metrics.requests.Store(3) | |
| + tp.metrics.failures.Store(4) | |
| + tp.metrics.mu.responses[200] = 1 | |
| + tp.metrics.mu.responses[404] = 2 | |
| req, _ := http.NewRequest(http.MethodHead, "/", nil) | |
| resp, err := tp.Perform(req) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/opensearchtransport.go b/opensearchtransport/opensearchtransport.go | |
| index 8257143..c224e69 100644 | |
| --- a/opensearchtransport/opensearchtransport.go | |
| +++ b/opensearchtransport/opensearchtransport.go | |
| @@ -28,6 +28,7 @@ package opensearchtransport | |
| import ( | |
| "bytes" | |
| + "context" | |
| "crypto/x509" | |
| "errors" | |
| "fmt" | |
| @@ -81,8 +82,23 @@ type Config struct { | |
| EnableMetrics bool | |
| EnableDebugLogger bool | |
| + DiscoverNodesOnStart bool // Discover nodes synchronously when initializing the client (blocks until complete). Default: false. | |
| DiscoverNodesInterval time.Duration | |
| + // DiscoveryTimeout controls the timeout for node discovery operations. | |
| + // - Zero value (0): uses default timeout (30 seconds) | |
| + // - Negative value: no timeout (discovery may block indefinitely until context cancellation) | |
| + // - Positive value: uses the specified timeout duration | |
| + // Default: 0 (uses 30 second default timeout) | |
| + DiscoveryTimeout time.Duration | |
| + | |
| + // IncludeDedicatedClusterManagers includes dedicated cluster manager nodes in request routing. | |
| + // When false (default), dedicated cluster manager nodes are excluded from client requests, | |
| + // following best practices and matching the Java client's NodeSelector.SKIP_DEDICATED_CLUSTER_MASTERS behavior. | |
| + // When true, all nodes including dedicated cluster managers can receive client requests. | |
| + // Default: false (excludes dedicated cluster managers for better performance) | |
| + IncludeDedicatedClusterManagers bool | |
| + | |
| Transport http.RoundTripper | |
| Logger Logger | |
| Selector Selector | |
| @@ -92,8 +108,6 @@ type Config struct { | |
| // Client represents the HTTP client. | |
| type Client struct { | |
| - sync.Mutex | |
| - | |
| urls []*url.URL | |
| username string | |
| password string | |
| @@ -107,8 +121,11 @@ type Client struct { | |
| enableRetryOnTimeout bool | |
| maxRetries int | |
| retryBackoff func(attempt int) time.Duration | |
| + discoverNodesOnStart bool | |
| discoverNodesInterval time.Duration | |
| - discoverNodesTimer *time.Timer | |
| + discoveryTimeout time.Duration | |
| + | |
| + includeDedicatedClusterManagers bool | |
| compressRequestBody bool | |
| pooledGzipCompressor *gzipCompressor | |
| @@ -118,8 +135,13 @@ type Client struct { | |
| transport http.RoundTripper | |
| logger Logger | |
| selector Selector | |
| - pool ConnectionPool | |
| poolFunc func([]*Connection, Selector) ConnectionPool | |
| + | |
| + mu struct { | |
| + sync.RWMutex | |
| + pool ConnectionPool | |
| + discoverNodesTimer *time.Timer | |
| + } | |
| } | |
| // New creates new transport client. | |
| @@ -172,7 +194,11 @@ func New(cfg Config) (*Client, error) { | |
| enableRetryOnTimeout: cfg.EnableRetryOnTimeout, | |
| maxRetries: cfg.MaxRetries, | |
| retryBackoff: cfg.RetryBackoff, | |
| + discoverNodesOnStart: cfg.DiscoverNodesOnStart, | |
| discoverNodesInterval: cfg.DiscoverNodesInterval, | |
| + discoveryTimeout: cfg.DiscoveryTimeout, | |
| + | |
| + includeDedicatedClusterManagers: cfg.IncludeDedicatedClusterManagers, | |
| compressRequestBody: cfg.CompressRequestBody, | |
| @@ -185,9 +211,9 @@ func New(cfg Config) (*Client, error) { | |
| client.userAgent = initUserAgent() | |
| if client.poolFunc != nil { | |
| - client.pool = client.poolFunc(conns, client.selector) | |
| + client.mu.pool = client.poolFunc(conns, client.selector) | |
| } else { | |
| - client.pool = NewConnectionPool(conns, client.selector) | |
| + client.mu.pool = NewConnectionPool(conns, client.selector) | |
| } | |
| if cfg.EnableDebugLogger { | |
| @@ -195,20 +221,38 @@ func New(cfg Config) (*Client, error) { | |
| } | |
| if cfg.EnableMetrics { | |
| - client.metrics = &metrics{responses: make(map[int]int)} | |
| + client.metrics = &metrics{} | |
| + client.metrics.mu.responses = make(map[int]int) | |
| // TODO(karmi): Type assertion to interface | |
| - if pool, ok := client.pool.(*singleConnectionPool); ok { | |
| + if pool, ok := client.mu.pool.(*singleConnectionPool); ok { | |
| pool.metrics = client.metrics | |
| } | |
| - if pool, ok := client.pool.(*statusConnectionPool); ok { | |
| + if pool, ok := client.mu.pool.(*statusConnectionPool); ok { | |
| pool.metrics = client.metrics | |
| } | |
| } | |
| + // Synchronous discovery on startup if requested | |
| + if client.discoverNodesOnStart { | |
| + var discoveryCtx context.Context | |
| + var cancel context.CancelFunc | |
| + | |
| + if timeout, useTimeout := client.resolveDiscoveryTimeout(); useTimeout { | |
| + // Apply the configured or default timeout | |
| + discoveryCtx, cancel = context.WithTimeout(context.Background(), timeout) | |
| + defer cancel() | |
| + } else { | |
| + // No timeout - use background context as-is | |
| + discoveryCtx = context.Background() | |
| + } | |
| + | |
| + if err := client.DiscoverNodesWithContext(discoveryCtx); err != nil { | |
| + return nil, fmt.Errorf("initial node discovery failed: %w", err) | |
| + } | |
| + } | |
| + | |
| if client.discoverNodesInterval > 0 { | |
| - time.AfterFunc(client.discoverNodesInterval, func() { | |
| - client.scheduleDiscoverNodes() | |
| - }) | |
| + client.scheduleDiscoverNodes(context.Background()) | |
| } | |
| if cfg.CompressRequestBody { | |
| @@ -218,6 +262,24 @@ func New(cfg Config) (*Client, error) { | |
| return &client, nil | |
| } | |
| +// resolveDiscoveryTimeout returns the timeout configuration for discovery operations. | |
| +// Returns: (duration, timeoutEnabled) | |
| +// - duration: the timeout duration to use (only meaningful if timeoutEnabled is true) | |
| +// - timeoutEnabled: true if a timeout should be applied | |
| +func (c *Client) resolveDiscoveryTimeout() (time.Duration, bool) { | |
| + switch { | |
| + case c.discoveryTimeout == 0: | |
| + // Zero value: use default timeout (30s) | |
| + return defaultDiscoveryTimeout, true | |
| + case c.discoveryTimeout < 0: | |
| + // Negative value: no timeout (may block until context cancellation) | |
| + return 0, false | |
| + default: | |
| + // Positive value: use configured timeout | |
| + return c.discoveryTimeout, true | |
| + } | |
| +} | |
| + | |
| // Perform executes the request and returns a response or error. | |
| func (c *Client) Perform(req *http.Request) (*http.Response, error) { | |
| var ( | |
| @@ -227,9 +289,7 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { | |
| // Record metrics, when enabled | |
| if c.metrics != nil { | |
| - c.metrics.Lock() | |
| - c.metrics.requests++ | |
| - c.metrics.Unlock() | |
| + c.metrics.requests.Add(1) | |
| } | |
| // Update request | |
| @@ -278,9 +338,9 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { | |
| ) | |
| // Get connection from the pool | |
| - c.Lock() | |
| - conn, err = c.pool.Next() | |
| - c.Unlock() | |
| + c.mu.RLock() | |
| + conn, err = c.mu.pool.Next() | |
| + c.mu.RUnlock() | |
| if err != nil { | |
| if c.logger != nil { | |
| c.logRoundTrip(req, nil, err, time.Time{}, time.Duration(0)) | |
| @@ -321,16 +381,14 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { | |
| if err != nil { | |
| // Record metrics, when enabled | |
| if c.metrics != nil { | |
| - c.metrics.Lock() | |
| - c.metrics.failures++ | |
| - c.metrics.Unlock() | |
| + c.metrics.failures.Add(1) | |
| } | |
| // Report the connection as unsuccessful | |
| - c.Lock() | |
| + c.mu.Lock() | |
| //nolint:errcheck // Questionable if the function even returns an error | |
| - c.pool.OnFailure(conn) | |
| - c.Unlock() | |
| + c.mu.pool.OnFailure(conn) | |
| + c.mu.Unlock() | |
| // Retry on EOF errors | |
| if errors.Is(err, io.EOF) { | |
| @@ -346,15 +404,13 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { | |
| } | |
| } else { | |
| // Report the connection as succesfull | |
| - c.Lock() | |
| - c.pool.OnSuccess(conn) | |
| - c.Unlock() | |
| + c.mu.Lock() | |
| + c.mu.pool.OnSuccess(conn) | |
| + c.mu.Unlock() | |
| } | |
| if res != nil && c.metrics != nil { | |
| - c.metrics.Lock() | |
| - c.metrics.responses[res.StatusCode]++ | |
| - c.metrics.Unlock() | |
| + c.metrics.incrementResponse(res.StatusCode) | |
| } | |
| // Retry on configured response statuses | |
| @@ -412,7 +468,9 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { | |
| // URLs returns a list of transport URLs. | |
| func (c *Client) URLs() []*url.URL { | |
| - return c.pool.URLs() | |
| + c.mu.RLock() | |
| + defer c.mu.RUnlock() | |
| + return c.mu.pool.URLs() | |
| } | |
| func (c *Client) setReqURL(u *url.URL, req *http.Request) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/opensearchtransport/opensearchtransport_internal_test.go b/opensearchtransport/opensearchtransport_internal_test.go | |
| index ae851a0..438e885 100644 | |
| --- a/opensearchtransport/opensearchtransport_internal_test.go | |
| +++ b/opensearchtransport/opensearchtransport_internal_test.go | |
| @@ -182,11 +182,11 @@ func TestTransportConnectionPool(t *testing.T) { | |
| t.Run("Single URL", func(t *testing.T) { | |
| tp, _ := New(Config{URLs: []*url.URL{{Scheme: "http", Host: "foo1"}}}) | |
| - if _, ok := tp.pool.(*singleConnectionPool); !ok { | |
| + if _, ok := tp.mu.pool.(*singleConnectionPool); !ok { | |
| t.Errorf("Expected connection to be singleConnectionPool, got: %T", tp) | |
| } | |
| - conn, err := tp.pool.Next() | |
| + conn, err := tp.mu.pool.Next() | |
| if err != nil { | |
| t.Errorf("Unexpected error: %s", err) | |
| } | |
| @@ -207,11 +207,11 @@ func TestTransportConnectionPool(t *testing.T) { | |
| {Scheme: "http", Host: "foo2"}, | |
| }}) | |
| - if _, ok := tp.pool.(*statusConnectionPool); !ok { | |
| + if _, ok := tp.mu.pool.(*statusConnectionPool); !ok { | |
| t.Errorf("Expected connection to be statusConnectionPool, got: %T", tp) | |
| } | |
| - conn, err = tp.pool.Next() | |
| + conn, err = tp.mu.pool.Next() | |
| if err != nil { | |
| t.Errorf("Unexpected error: %s", err) | |
| } | |
| @@ -219,7 +219,7 @@ func TestTransportConnectionPool(t *testing.T) { | |
| t.Errorf("Unexpected URL, want=foo1, got=%s", conn.URL) | |
| } | |
| - conn, err = tp.pool.Next() | |
| + conn, err = tp.mu.pool.Next() | |
| if err != nil { | |
| t.Errorf("Unexpected error: %s", err) | |
| } | |
| @@ -227,7 +227,7 @@ func TestTransportConnectionPool(t *testing.T) { | |
| t.Errorf("Unexpected URL, want=http://foo2, got=%s", conn.URL) | |
| } | |
| - conn, err = tp.pool.Next() | |
| + conn, err = tp.mu.pool.Next() | |
| if err != nil { | |
| t.Errorf("Unexpected error: %s", err) | |
| } | |
| @@ -276,22 +276,22 @@ func TestTransportCustomConnectionPool(t *testing.T) { | |
| }, | |
| }) | |
| - if _, ok := tp.pool.(*CustomConnectionPool); !ok { | |
| - t.Fatalf("Unexpected connection pool, want=CustomConnectionPool, got=%T", tp.pool) | |
| + if _, ok := tp.mu.pool.(*CustomConnectionPool); !ok { | |
| + t.Fatalf("Unexpected connection pool, want=CustomConnectionPool, got=%T", tp.mu.pool) | |
| } | |
| - conn, err := tp.pool.Next() | |
| + conn, err := tp.mu.pool.Next() | |
| if err != nil { | |
| t.Fatalf("Unexpected error: %s", err) | |
| } | |
| if conn.URL == nil { | |
| t.Errorf("Empty connection URL: %+v", conn) | |
| } | |
| - if err := tp.pool.OnFailure(conn); err != nil { | |
| + if err := tp.mu.pool.OnFailure(conn); err != nil { | |
| t.Errorf("Error removing the %q connection: %s", conn.URL, err) | |
| } | |
| - if len(tp.pool.URLs()) != 1 { | |
| - t.Errorf("Unexpected number of connections in pool: %q", tp.pool) | |
| + if len(tp.mu.pool.URLs()) != 1 { | |
| + t.Errorf("Unexpected number of connections in pool: %q", tp.mu.pool) | |
| } | |
| }) | |
| } | |
| @@ -1170,3 +1170,114 @@ func TestRequestSigning(t *testing.T) { | |
| } | |
| }) | |
| } | |
| + | |
| +func TestResolveDiscoveryTimeout(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + discoveryTimeout time.Duration | |
| + expectedDuration time.Duration | |
| + expectedEnabled bool | |
| + }{ | |
| + { | |
| + name: "zero value uses default timeout", | |
| + discoveryTimeout: 0, | |
| + expectedDuration: defaultDiscoveryTimeout, | |
| + expectedEnabled: true, | |
| + }, | |
| + { | |
| + name: "positive value uses configured timeout", | |
| + discoveryTimeout: 45 * time.Second, | |
| + expectedDuration: 45 * time.Second, | |
| + expectedEnabled: true, | |
| + }, | |
| + { | |
| + name: "negative value disables timeout", | |
| + discoveryTimeout: -1 * time.Second, | |
| + expectedDuration: 0, // Duration is not meaningful when timeout is disabled | |
| + expectedEnabled: false, | |
| + }, | |
| + { | |
| + name: "large negative value disables timeout", | |
| + discoveryTimeout: -10 * time.Minute, | |
| + expectedDuration: 0, | |
| + expectedEnabled: false, | |
| + }, | |
| + { | |
| + name: "very small positive value", | |
| + discoveryTimeout: 1 * time.Millisecond, | |
| + expectedDuration: 1 * time.Millisecond, | |
| + expectedEnabled: true, | |
| + }, | |
| + { | |
| + name: "very large positive value", | |
| + discoveryTimeout: 24 * time.Hour, | |
| + expectedDuration: 24 * time.Hour, | |
| + expectedEnabled: true, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + client := &Client{ | |
| + discoveryTimeout: tt.discoveryTimeout, | |
| + } | |
| + | |
| + duration, enabled := client.resolveDiscoveryTimeout() | |
| + | |
| + if duration != tt.expectedDuration { | |
| + t.Errorf("Expected duration %v, got %v", tt.expectedDuration, duration) | |
| + } | |
| + | |
| + if enabled != tt.expectedEnabled { | |
| + t.Errorf("Expected enabled %v, got %v", tt.expectedEnabled, enabled) | |
| + } | |
| + }) | |
| + } | |
| +} | |
| + | |
| +func TestConfigDiscoveryTimeout(t *testing.T) { | |
| + tests := []struct { | |
| + name string | |
| + configTimeout time.Duration | |
| + expectedClientTimeout time.Duration | |
| + }{ | |
| + { | |
| + name: "zero config timeout", | |
| + configTimeout: 0, | |
| + expectedClientTimeout: 0, | |
| + }, | |
| + { | |
| + name: "positive config timeout", | |
| + configTimeout: 30 * time.Second, | |
| + expectedClientTimeout: 30 * time.Second, | |
| + }, | |
| + { | |
| + name: "negative config timeout", | |
| + configTimeout: -5 * time.Second, | |
| + expectedClientTimeout: -5 * time.Second, | |
| + }, | |
| + } | |
| + | |
| + for _, tt := range tests { | |
| + t.Run(tt.name, func(t *testing.T) { | |
| + urls := []*url.URL{ | |
| + {Scheme: "http", Host: "localhost:9200"}, | |
| + } | |
| + | |
| + cfg := Config{ | |
| + URLs: urls, | |
| + DiscoveryTimeout: tt.configTimeout, | |
| + } | |
| + | |
| + client, err := New(cfg) | |
| + if err != nil { | |
| + t.Fatalf("Failed to create client: %v", err) | |
| + } | |
| + | |
| + if client.discoveryTimeout != tt.expectedClientTimeout { | |
| + t.Errorf("Expected client.discoveryTimeout %v, got %v", | |
| + tt.expectedClientTimeout, client.discoveryTimeout) | |
| + } | |
| + }) | |
| + } | |
| +} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/USER_GUIDE.md b/USER_GUIDE.md | |
| index edef4d3..9d217fa 100644 | |
| --- a/USER_GUIDE.md | |
| +++ b/USER_GUIDE.md | |
| @@ -396,4 +396,5 @@ func getCredentialProvider(accessKey, secretAccessKey, token string) aws.Credent | |
| - [Advanced Index Actions](guides/advanced_index_actions.md) | |
| - [Index Templates](guides/index_template.md) | |
| - [Data Streams](guides/data_streams.md) | |
| +- [Node Discovery and Role Management](guides/node_discovery_and_roles.md) | |
| - [Retry and Backoff](guides/retry_backoff.md) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment