Add state-store backed port allocation#159
Conversation
karolz-ms
left a comment
There was a problem hiding this comment.
I am still reviewing the whole PR, but need to take a break and don't want to lose the comments I already made, so here they are.
| return allLocalIps, nil | ||
| } | ||
|
|
||
| func GetEphemeralPortRange() (int, int, bool) { |
There was a problem hiding this comment.
I know this is not part of your change, but we should add a comment to this public API describing what the "matched" bool return value is
| _, _ = hash.Write([]byte(address)) | ||
| hashValue := hash.Sum64() | ||
| offset := int(hashValue % uint64(total)) | ||
| step := stateStorePortAllocationCandidateStep(total, hashValue/uint64(total)) |
There was a problem hiding this comment.
Do you really want the hash value to be divided by the total here? If yes, why is this calculation done here as opposed to the body of the step-calculating function?
| candidates := newStateStorePortAllocationCandidateIterator(config.portRanges, string(protocol), address) | ||
| for { | ||
| if ctxErr := allocationCtx.Err(); ctxErr != nil { | ||
| return 0, ctxErr, config.mode == portAllocatorModeStateStoreWithFallback |
There was a problem hiding this comment.
Nit: we can probably just compute shouldFallback (instead of using config.mode == portAllocatorModeStateStoreWithFallback here and below) before going into for loop. Not for performance, but just to make the code a bit more readable.
| for _, conflictingReservation := range inactiveReservations { | ||
| deleteErr := deletePortReservation(ctx, conn, conflictingReservation) | ||
| if deleteErr != nil { | ||
| return deleteErr |
There was a problem hiding this comment.
Should the inability to delete inactive reservation cause the new reservation to fail?
| releaseErr = errors.Join(releaseErr, proxyReleaseErr) | ||
| } | ||
| } | ||
| return releaseErr |
There was a problem hiding this comment.
I am not convinced that the failure to release a port needs to be handled in any special way beyond logging it. The other mechanisms we have (reaping orphaned port reservations in particular) should be enough to keep the user machine usable.
|
|
||
| releaseExistingErr := stopProxiesAndReleasePortReservations(ctx, psd.proxies, log) | ||
| psd.proxiesStopped = len(psd.proxies) > 0 | ||
| if releaseExistingErr != nil { |
There was a problem hiding this comment.
See my comment on stopProxiesAndReleasePortReservations()--I question whether we should fail if port reservations cannot be released.
| if portAllocationErr != nil { | ||
| stopProxies(proxies, log) | ||
| releaseErr := stopProxiesAndReleasePortReservations(ctx, proxies, log) | ||
| if releaseErr != nil { |
There was a problem hiding this comment.
Again, consider making stopProxiesAndReleasePortReservations a "never fails" method
|
|
||
| func (r *ServiceReconciler) stopService(svcName types.NamespacedName, svc *apiv1.Service, log logr.Logger) { | ||
| serviceData, found := r.serviceInfo.LoadAndDelete(svcName) | ||
| func reserveDeclaredServicePort(ctx context.Context, svc *apiv1.Service, log logr.Logger) objectChange { |
There was a problem hiding this comment.
Not sure if we need this?
If the service (Spec.Port) is using a specific (nonzero) port, then the user took responsibility for selecting that particular port and we should neither "reserve" nor "release" it. It is semantically different from the case of "the user asked for random port and we decided to use 1234, therefore 1234 is reserved by current DCP instance". The only concern with this scenario (user-chosen port) is #26 and we should fix it, but perhaps via a separate PR, and it probably calls for a separate kind of "port reservation".
On the other hand, if the port is 0 (Service should use random port), then trying to "reserve" or "release" it should be a no-op and waste of time.
So it seems in either case what this method does is not what we want.
| } | ||
| } | ||
|
|
||
| func (r *ServiceReconciler) releaseServiceEndpointPortReservations(ctx context.Context, svc *apiv1.Service, log logr.Logger) objectChange { |
There was a problem hiding this comment.
This is more than unnecessary, this is harmful (as part of service controller).
Whatever created the Endpoint should be responsible for deleting the Endpoint and releasing any port reservations it might have made as part of creating the Endpoint.
| * Licensed under the MIT License. See LICENSE in the project root for license information. | ||
| *--------------------------------------------------------------------------------------------*/ | ||
|
|
||
| package controllers |
There was a problem hiding this comment.
This should go into test/integration package.
| if len(ips) == 0 { | ||
| return "", fmt.Errorf("could not obtain valid IP address(es) for 'localhost': %w", err) | ||
| } | ||
| requestedServiceAddress = networking.IpToString(ips[rand.Intn(len(ips))]) |
There was a problem hiding this comment.
This makes result of getRequestedServiceAddress() non-deterministic, which might cause us take multiple port reservations per single Service instance (e.g. because of multiple reconciliations). I would try to make the result deterministic e.g. choose the IP based on the hash of service name or something along these lines.
| change = r.stopService(ctx, svc.NamespacedName(), &svc, log) | ||
| change |= releaseDeclaredServicePort(ctx, &svc, log) | ||
| change |= r.releaseServiceEndpointPortReservations(ctx, &svc, log) | ||
| if change == noChange { |
There was a problem hiding this comment.
releaseDeclaredServicePort, releaseServiceEndpointPortReservations, and stopProxiesAndReleasePortReservations all return additionalReconciliationNeeded on any error. The finalizer stays, the Service is undeletable from the cluster, and the controller burns CPU retrying. If state-store is permanently degraded the operator has no way out. Cap the retry attempts (after which we log and proceed), or distinguish transient vs. terminal errors.
| var svc apiv1.Service | ||
| if getErr := client.Get(ctx, serviceProducer.ServiceNamespacedName(), &svc); getErr != nil { | ||
| if apierrors.IsNotFound(getErr) { | ||
| return nil |
There was a problem hiding this comment.
I think the caller needs to learn about this error... or we skip the service existence check, delete this utility and make the callers call networking.ReserveSpecificPort directly.
| }, false, nil | ||
| } | ||
|
|
||
| func allocatePortFromStateStoreRange(ctx context.Context, protocol apiv1.PortProtocol, address string, log logr.Logger) (int32, error, bool) { |
There was a problem hiding this comment.
More than one function in this file do not return the error as the last return value (non-idiomatic Go).
| `SELECT protocol, address, port, | ||
| owner_pid, owner_identity_time, updated_at_unix_nano | ||
| FROM port_allocations | ||
| WHERE protocol = ? AND port = ?`, |
There was a problem hiding this comment.
Shouldn't it also check the address?
As written here this will do full table scan because the index is on (protocol, address, port)
| allocationTimeout time.Duration | ||
| } | ||
|
|
||
| func getStateStorePortAllocationConfig(log logr.Logger) (*stateStorePortAllocationConfig, bool, error) { |
There was a problem hiding this comment.
The result of this should probably be cached, because it is called on every allocate/check/reserve/release call.
| if cleanupErr := stateStore.DeleteInactiveResourceLeases(ctrlCtx); cleanupErr != nil { | ||
| log.Error(cleanupErr, "Failed to clean up inactive state store resource leases") | ||
| } | ||
| if cleanupErr := stateStore.DeleteInactivePortReservations(ctrlCtx); cleanupErr != nil { |
There was a problem hiding this comment.
Let's do this (and maybe the resource leases too) in a separate goroutine. Determining inactive port reservations involves process discovery which is slow
| return reservations, nil | ||
| } | ||
|
|
||
| func portReservationAddressesConflict(requestedAddress []byte, reservedAddress []byte) bool { |
There was a problem hiding this comment.
portReservationAddressesConflict short-circuits on length mismatch, but the conflict message blames "another DCP process." When the same DCP process has a wildcard reservation (e.g. 0.0.0.0:1234) and the same process later calls ReserveSpecificPort for 192.168.x.y:1234, the same-owner check in reservePort (reuseSameOwner && sameOwner && exactAddress) doesn't apply because the addresses differ — so the code falls into the active = resourceLeaseOwnerIsActive(...) branch (true, it's us) and returns ErrPortReservationHeld, which the allocator surfaces as "port %d is already reserved by another DCP process". The message is wrong in this case and operationally confusing. Detect same-owner and emit a clearer message ("conflicts with this process's own wildcard reservation").
Replace fragile random ephemeral port selection with a configurable state-store backed allocation path while keeping the MRU allocator as an isolated fallback. The new allocator stores normalized IP reservations, handles wildcard address conflicts, serializes local bind probes with reservation writes, and cleans up reservations for stopped service proxies and deleted service endpoints. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Release pre-reserved endpoint ports when Endpoint persistence fails and prevent stopped proxy data from being reused after reservation release errors. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Explain why the state-store allocator defaults to 20000-32767. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Define named constants for the state-store allocator's default scanning range. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Avoid forcing unmatched Windows and macOS ephemeral fallback ranges to Linux's default minimum. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
58e497e to
7909b66
Compare
DCP currently picks free ports by probing random ephemeral ports and recording recent choices in an MRU file, which still leaves a timing window where another process can claim the port after the probe closes. This change adds a state-store-backed allocation model so DCP instances can coordinate non-ephemeral child-process port reservations through SQLite while keeping the legacy MRU allocator as an isolated fallback.
Summary
port_allocationsstate-store table and reservation APIs with normalized packed IP addresses, owner identity tracking, inactive-owner cleanup, and wildcard address conflict handling.Notes for reviewers
The state-store path and legacy MRU path intentionally do not share persistence. State-store allocation writes only SQLite reservations; MRU fallback writes only the MRU file. The new range scanner uses a deterministic randomized stride over the configured range instead of precomputing candidates.
Validation
make test-prereqs && go test -count 1 -parallel 32 ./internal/networking ./internal/statestore ./controllersmake testmake lint