Skip to content

Add state-store backed port allocation#159

Draft
danegsta wants to merge 7 commits into
mainfrom
danegsta/port-allocation-plan
Draft

Add state-store backed port allocation#159
danegsta wants to merge 7 commits into
mainfrom
danegsta/port-allocation-plan

Conversation

@danegsta

Copy link
Copy Markdown
Member

DCP currently picks free ports by probing random ephemeral ports and recording recent choices in an MRU file, which still leaves a timing window where another process can claim the port after the probe closes. This change adds a state-store-backed allocation model so DCP instances can coordinate non-ephemeral child-process port reservations through SQLite while keeping the legacy MRU allocator as an isolated fallback.

Summary

  • Add a port_allocations state-store table and reservation APIs with normalized packed IP addresses, owner identity tracking, inactive-owner cleanup, and wildcard address conflict handling.
  • Split networking allocation into a public dispatcher plus separate state-store and MRU allocator implementations.
  • Add configurable state-store allocation ranges, default non-ephemeral scanning, local bind-probe/reservation serialization, and context-aware allocation/reserve/release APIs.
  • Wire controller startup, service proxy allocation, declared service ports, endpoint reservations, templating, and kubeconfig generation through the updated allocator APIs.
  • Release state-store reservations when service proxies stop, endpoints are cleaned up, or services are deleted.

Notes for reviewers

The state-store path and legacy MRU path intentionally do not share persistence. State-store allocation writes only SQLite reservations; MRU fallback writes only the MRU file. The new range scanner uses a deterministic randomized stride over the configured range instead of precomputing candidates.

Validation

  • make test-prereqs && go test -count 1 -parallel 32 ./internal/networking ./internal/statestore ./controllers
  • make test
  • make lint

@karolz-ms karolz-ms left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still reviewing the whole PR, but need to take a break and don't want to lose the comments I already made, so here they are.

Comment thread internal/networking/port_allocator_statestore.go Outdated
return allLocalIps, nil
}

func GetEphemeralPortRange() (int, int, bool) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not part of your change, but we should add a comment to this public API describing what the "matched" bool return value is

Comment thread internal/networking/port_allocator_statestore.go Outdated
_, _ = hash.Write([]byte(address))
hashValue := hash.Sum64()
offset := int(hashValue % uint64(total))
step := stateStorePortAllocationCandidateStep(total, hashValue/uint64(total))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really want the hash value to be divided by the total here? If yes, why is this calculation done here as opposed to the body of the step-calculating function?

candidates := newStateStorePortAllocationCandidateIterator(config.portRanges, string(protocol), address)
for {
if ctxErr := allocationCtx.Err(); ctxErr != nil {
return 0, ctxErr, config.mode == portAllocatorModeStateStoreWithFallback

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can probably just compute shouldFallback (instead of using config.mode == portAllocatorModeStateStoreWithFallback here and below) before going into for loop. Not for performance, but just to make the code a bit more readable.

Comment thread internal/networking/networking.go Outdated
Comment thread internal/networking/networking.go Outdated
Comment thread internal/networking/port_allocator_statestore.go Outdated
Comment thread internal/networking/port_allocator_statestore.go
Comment thread internal/networking/port_allocator.go
Comment thread internal/statestore/migrations/000002_port_allocations.up.sql Outdated
Comment thread internal/statestore/ports.go Outdated
Comment thread internal/statestore/ports.go Outdated
Comment thread internal/statestore/ports.go Outdated
for _, conflictingReservation := range inactiveReservations {
deleteErr := deletePortReservation(ctx, conn, conflictingReservation)
if deleteErr != nil {
return deleteErr

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the inability to delete inactive reservation cause the new reservation to fail?

Comment thread internal/statestore/ports.go Outdated
Comment thread controllers/service_controller.go Outdated
Comment thread controllers/service_controller.go Outdated
Comment thread controllers/service_controller.go Outdated
Comment thread controllers/service_controller.go Outdated
releaseErr = errors.Join(releaseErr, proxyReleaseErr)
}
}
return releaseErr

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced that the failure to release a port needs to be handled in any special way beyond logging it. The other mechanisms we have (reaping orphaned port reservations in particular) should be enough to keep the user machine usable.


releaseExistingErr := stopProxiesAndReleasePortReservations(ctx, psd.proxies, log)
psd.proxiesStopped = len(psd.proxies) > 0
if releaseExistingErr != nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on stopProxiesAndReleasePortReservations()--I question whether we should fail if port reservations cannot be released.

if portAllocationErr != nil {
stopProxies(proxies, log)
releaseErr := stopProxiesAndReleasePortReservations(ctx, proxies, log)
if releaseErr != nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, consider making stopProxiesAndReleasePortReservations a "never fails" method


func (r *ServiceReconciler) stopService(svcName types.NamespacedName, svc *apiv1.Service, log logr.Logger) {
serviceData, found := r.serviceInfo.LoadAndDelete(svcName)
func reserveDeclaredServicePort(ctx context.Context, svc *apiv1.Service, log logr.Logger) objectChange {

@karolz-ms karolz-ms Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need this?

If the service (Spec.Port) is using a specific (nonzero) port, then the user took responsibility for selecting that particular port and we should neither "reserve" nor "release" it. It is semantically different from the case of "the user asked for random port and we decided to use 1234, therefore 1234 is reserved by current DCP instance". The only concern with this scenario (user-chosen port) is #26 and we should fix it, but perhaps via a separate PR, and it probably calls for a separate kind of "port reservation".

On the other hand, if the port is 0 (Service should use random port), then trying to "reserve" or "release" it should be a no-op and waste of time.

So it seems in either case what this method does is not what we want.

}
}

func (r *ServiceReconciler) releaseServiceEndpointPortReservations(ctx context.Context, svc *apiv1.Service, log logr.Logger) objectChange {

@karolz-ms karolz-ms Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more than unnecessary, this is harmful (as part of service controller).

Whatever created the Endpoint should be responsible for deleting the Endpoint and releasing any port reservations it might have made as part of creating the Endpoint.

* Licensed under the MIT License. See LICENSE in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package controllers

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go into test/integration package.

if len(ips) == 0 {
return "", fmt.Errorf("could not obtain valid IP address(es) for 'localhost': %w", err)
}
requestedServiceAddress = networking.IpToString(ips[rand.Intn(len(ips))])

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes result of getRequestedServiceAddress() non-deterministic, which might cause us take multiple port reservations per single Service instance (e.g. because of multiple reconciliations). I would try to make the result deterministic e.g. choose the IP based on the hash of service name or something along these lines.

change = r.stopService(ctx, svc.NamespacedName(), &svc, log)
change |= releaseDeclaredServicePort(ctx, &svc, log)
change |= r.releaseServiceEndpointPortReservations(ctx, &svc, log)
if change == noChange {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

releaseDeclaredServicePort, releaseServiceEndpointPortReservations, and stopProxiesAndReleasePortReservations all return additionalReconciliationNeeded on any error. The finalizer stays, the Service is undeletable from the cluster, and the controller burns CPU retrying. If state-store is permanently degraded the operator has no way out. Cap the retry attempts (after which we log and proceed), or distinguish transient vs. terminal errors.

var svc apiv1.Service
if getErr := client.Get(ctx, serviceProducer.ServiceNamespacedName(), &svc); getErr != nil {
if apierrors.IsNotFound(getErr) {
return nil

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the caller needs to learn about this error... or we skip the service existence check, delete this utility and make the callers call networking.ReserveSpecificPort directly.

}, false, nil
}

func allocatePortFromStateStoreRange(ctx context.Context, protocol apiv1.PortProtocol, address string, log logr.Logger) (int32, error, bool) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than one function in this file do not return the error as the last return value (non-idiomatic Go).

`SELECT protocol, address, port,
owner_pid, owner_identity_time, updated_at_unix_nano
FROM port_allocations
WHERE protocol = ? AND port = ?`,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it also check the address?

As written here this will do full table scan because the index is on (protocol, address, port)

allocationTimeout time.Duration
}

func getStateStorePortAllocationConfig(log logr.Logger) (*stateStorePortAllocationConfig, bool, error) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of this should probably be cached, because it is called on every allocate/check/reserve/release call.

Comment thread internal/statestore/ports.go Outdated
if cleanupErr := stateStore.DeleteInactiveResourceLeases(ctrlCtx); cleanupErr != nil {
log.Error(cleanupErr, "Failed to clean up inactive state store resource leases")
}
if cleanupErr := stateStore.DeleteInactivePortReservations(ctrlCtx); cleanupErr != nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this (and maybe the resource leases too) in a separate goroutine. Determining inactive port reservations involves process discovery which is slow

Comment thread internal/statestore/ports.go Outdated
Comment thread internal/statestore/ports.go Outdated
return reservations, nil
}

func portReservationAddressesConflict(requestedAddress []byte, reservedAddress []byte) bool {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

portReservationAddressesConflict short-circuits on length mismatch, but the conflict message blames "another DCP process." When the same DCP process has a wildcard reservation (e.g. 0.0.0.0:1234) and the same process later calls ReserveSpecificPort for 192.168.x.y:1234, the same-owner check in reservePort (reuseSameOwner && sameOwner && exactAddress) doesn't apply because the addresses differ — so the code falls into the active = resourceLeaseOwnerIsActive(...) branch (true, it's us) and returns ErrPortReservationHeld, which the allocator surfaces as "port %d is already reserved by another DCP process". The message is wrong in this case and operationally confusing. Detect same-owner and emit a clearer message ("conflicts with this process's own wildcard reservation").

danegsta and others added 3 commits June 10, 2026 16:11
Replace fragile random ephemeral port selection with a configurable state-store backed allocation path while keeping the MRU allocator as an isolated fallback. The new allocator stores normalized IP reservations, handles wildcard address conflicts, serializes local bind probes with reservation writes, and cleans up reservations for stopped service proxies and deleted service endpoints.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Release pre-reserved endpoint ports when Endpoint persistence fails and prevent stopped proxy data from being reused after reservation release errors.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Explain why the state-store allocator defaults to 20000-32767.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
danegsta and others added 4 commits June 10, 2026 16:11
Define named constants for the state-store allocator's default scanning range.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Avoid forcing unmatched Windows and macOS ephemeral fallback ranges to Linux's default minimum.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@danegsta danegsta force-pushed the danegsta/port-allocation-plan branch from 58e497e to 7909b66 Compare June 10, 2026 23:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants