From 4e8d3ba019681a937d9642c07392bc04158ca857 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Jun 2026 13:27:12 +0200 Subject: [PATCH 1/4] fix(da): fix polling fallback when ws not available --- CHANGELOG.md | 6 ++ apps/evm/server/force_inclusion_test.go | 2 + .../internal/da/async_block_retriever_test.go | 10 ++ block/internal/da/client.go | 8 ++ block/internal/da/interface.go | 5 + block/internal/da/subscriber.go | 71 +++++++++++- block/internal/da/tracing.go | 3 + block/internal/da/tracing_test.go | 1 + block/internal/syncing/syncer_test.go | 1 + pkg/da/jsonrpc/client.go | 102 ++++++++++++++++-- test/mocks/da.go | 46 +++++++- test/testda/dummy.go | 4 + 12 files changed, 248 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cb0126524..0849ae81c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## v1.1.4 + +### Fixed + +- DA client falls back to HTTP polling with `Retrieve` when the WebSocket connection fails, instead of trying to use the WS-only `Subscribe` over HTTP. Automatically upgrade to WS is available [#3211](https://github.com/evstack/ev-node/pull/3361) + ## v1.1.3 ### Fixed diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index 8b05cad689..df08473d73 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -85,6 +85,8 @@ func (m *mockDA) HasForcedInclusionNamespace() bool { return true } +func (m *mockDA) SupportsSubscribe() bool { return true } + func (m *mockDA) GetLatestDAHeight(_ context.Context) (uint64, error) { return 0, nil } diff --git a/block/internal/da/async_block_retriever_test.go b/block/internal/da/async_block_retriever_test.go index d9e4613988..6bfcb57fef 100644 --- a/block/internal/da/async_block_retriever_test.go +++ b/block/internal/da/async_block_retriever_test.go @@ -53,6 +53,8 @@ func TestAsyncBlockRetriever_SubscriptionDrivenCaching(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // Create a subscription channel that delivers one event then blocks. subCh := make(chan datypes.SubscriptionEvent, 1) subCh <- datypes.SubscriptionEvent{ @@ -104,6 +106,8 @@ func TestAsyncBlockRetriever_CatchupFillsGaps(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // Subscription delivers height 105 (no blobs — just a signal). subCh := make(chan datypes.SubscriptionEvent, 1) subCh <- datypes.SubscriptionEvent{Height: 105} @@ -153,6 +157,8 @@ func TestAsyncBlockRetriever_HeightFromFuture(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // Subscription delivers height 100 with no blobs. subCh := make(chan datypes.SubscriptionEvent) client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once() @@ -187,6 +193,8 @@ func TestAsyncBlockRetriever_StopGracefully(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + blockCh := make(chan datypes.SubscriptionEvent) client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe() client.On("Retrieve", mock.Anything, mock.Anything, fiNs).Return(datypes.ResultRetrieve{ @@ -211,6 +219,8 @@ func TestAsyncBlockRetriever_ReconnectOnSubscriptionError(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // First subscription closes immediately (simulating error). closedCh := make(chan datypes.SubscriptionEvent) close(closedCh) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 455d151d9f..b16950ce21 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -38,6 +38,7 @@ type client struct { dataNamespaceBz []byte forcedNamespaceBz []byte hasForcedNamespace bool + isWebSocket bool timestampCache *blockTimestampCache } @@ -137,6 +138,7 @@ func NewClient(cfg Config) FullClient { dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(), forcedNamespaceBz: forcedNamespaceBz, hasForcedNamespace: hasForcedNamespace, + isWebSocket: cfg.DA.IsWebSocket, timestampCache: newBlockTimestampCache(blockTimestampCacheWindow), } } @@ -485,6 +487,12 @@ func (c *client) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } +// SupportsSubscribe reports whether the underlying transport supports +// channel-based subscriptions (WebSocket). +func (c *client) SupportsSubscribe() bool { + return c.isWebSocket +} + // Subscribe subscribes to blobs in the given namespace via the celestia-node // Subscribe API. It returns a channel that emits a SubscriptionEvent for every // DA block containing a matching blob. The channel is closed when ctx is diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index f1272087f0..e1ff8d6fcf 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -31,6 +31,11 @@ type Client interface { // GetLatestDAHeight returns the latest height available on the DA layer. GetLatestDAHeight(ctx context.Context) (uint64, error) + // SupportsSubscribe reports whether the underlying transport supports + // channel-based subscriptions (WebSocket). When false, callers must use + // polling-based retrieval via Retrieve instead. + SupportsSubscribe() bool + // Namespace accessors. GetHeaderNamespace() []byte GetDataNamespace() []byte diff --git a/block/internal/da/subscriber.go b/block/internal/da/subscriber.go index 8ff46773ce..e59d9f17b4 100644 --- a/block/internal/da/subscriber.go +++ b/block/internal/da/subscriber.go @@ -122,10 +122,15 @@ func (s *Subscriber) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) s.cancel = cancel - s.wg.Add(2) s.lifecycleMu.Unlock() - go s.followLoop(ctx) + if s.client.SupportsSubscribe() { + s.wg.Add(2) + go s.followLoop(ctx) + } else { + s.wg.Add(2) + go s.pollLoop(ctx) + } go s.catchupLoop(ctx) return nil @@ -167,6 +172,68 @@ func (s *Subscriber) signalCatchup() { } } +// pollLoop periodically queries the latest DA height and triggers +// catchup when new heights are available. The catchup loop fetches blobs +// via Retrieve (which uses GetAll) so each height is fetched exactly once. +// Periodically checks whether the underlying transport has been upgraded +// to WebSocket and switches to followLoop when that happens. +func (s *Subscriber) pollLoop(ctx context.Context) { + defer s.wg.Done() + + s.logger.Info().Msg("starting poll loop") + defer s.logger.Info().Msg("poll loop stopped") + + // Do an immediate poll on startup so we don't wait for the first tick. + s.pollDAHeight(ctx) + + ticker := time.NewTicker(s.daBlockTime) + defer ticker.Stop() + + for { + // If the transport has been upgraded to WS in the background, + // switch to the subscription-based follow loop. + if s.client.SupportsSubscribe() { + s.logger.Info().Msg("WebSocket available, switching from poll to follow loop") + s.wg.Add(1) + go s.followLoop(ctx) + return + } + + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.pollDAHeight(ctx) + } + } +} + +// pollDAHeight queries GetLatestDAHeight and signals catchup when a new +// height is observed. The actual blob retrieval is done by catchupLoop. +func (s *Subscriber) pollDAHeight(ctx context.Context) { + height, err := s.client.GetLatestDAHeight(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + s.logger.Warn().Err(err).Msg("poll: failed to get latest DA height") + return + } + + cur := s.highestSeenDAHeight.Load() + if height <= cur { + return + } + + s.seenSubscriptionEvent.Store(true) + s.logger.Debug(). + Uint64("new_da_height", height). + Uint64("current_highest_seen", cur). + Msg("poll: observed new DA height") + + s.updateHighest(height) +} + // followLoop subscribes to DA blob events and keeps highestSeenDAHeight up to date. func (s *Subscriber) followLoop(ctx context.Context) { defer s.wg.Done() diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index c41c920494..e8e5ab391a 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -165,6 +165,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte { func (t *tracedClient) HasForcedInclusionNamespace() bool { return t.inner.HasForcedInclusionNamespace() } +func (t *tracedClient) SupportsSubscribe() bool { + return t.inner.SupportsSubscribe() +} func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte, includeTimestamp bool) (<-chan datypes.SubscriptionEvent, error) { return t.inner.Subscribe(ctx, namespace, includeTimestamp) } diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index 9ea344e3ab..74bf3882eb 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -74,6 +74,7 @@ func (m *mockFullClient) GetHeaderNamespace() []byte { func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } +func (m *mockFullClient) SupportsSubscribe() bool { return true } // setup a tracer provider + span recorder func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index a8f1b01102..3d4a21d634 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -116,6 +116,7 @@ func makeSignedHeaderBytes( func setupMockDAClient(tb testing.TB) (da.Client, chan datypes.SubscriptionEvent) { mockClient := testmocks.NewMockClient(tb) eventCh := make(chan datypes.SubscriptionEvent, 1) + mockClient.EXPECT().SupportsSubscribe().Return(true).Maybe() mockClient.EXPECT().Subscribe(mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(eventCh), nil).Maybe() return mockClient, eventCh } diff --git a/pkg/da/jsonrpc/client.go b/pkg/da/jsonrpc/client.go index 664da4114b..36fc2bd808 100644 --- a/pkg/da/jsonrpc/client.go +++ b/pkg/da/jsonrpc/client.go @@ -5,6 +5,8 @@ import ( "fmt" "net/http" "strings" + "sync" + "time" libshare "github.com/celestiaorg/go-square/v3/share" "github.com/filecoin-project/go-jsonrpc" @@ -13,14 +15,28 @@ import ( // Client dials the celestia-node RPC "blob" and "header" namespaces. type Client struct { - Blob BlobAPI - Header HeaderAPI - closer jsonrpc.ClientCloser + Blob BlobAPI + Header HeaderAPI + IsWebSocket bool + + mu sync.Mutex + closer jsonrpc.ClientCloser + retryCancel context.CancelFunc // stops the background WS retry loop } -// Close closes the underlying JSON-RPC connection. +// Close closes the underlying JSON-RPC connection and stops any +// background WebSocket retry loop. func (c *Client) Close() { - if c != nil && c.closer != nil { + if c == nil { + return + } + c.mu.Lock() + if c.retryCancel != nil { + c.retryCancel() + c.retryCancel = nil + } + c.mu.Unlock() + if c.closer != nil { c.closer() } } @@ -72,18 +88,88 @@ func NewClient(ctx context.Context, addr, token string, authHeaderName string) ( // NewWSClient connects to the DA RPC endpoint over WebSocket. // Automatically converts http:// to ws:// (and https:// to wss://). // Supports channel-based subscriptions (e.g. Subscribe). -// Note: WebSocket connections are eager — they connect at creation time -// if the initial WS dial fails, falls back to HTTP polling for the entire session. +// WebSocket connections are eager — they connect at creation time. +// If the initial WS dial fails, it falls back to HTTP polling and spawns a +// background goroutine that periodically retries the WS connection. When +// the WS endpoint becomes reachable, the transport is transparently upgraded. func NewWSClient(ctx context.Context, logger zerolog.Logger, addr, token string, authHeaderName string) (*Client, error) { client, err := NewClient(ctx, httpToWS(addr), token, authHeaderName) if err != nil { logger.Warn().Err(err).Msg("DA websocket connection failed, falling back to DA polling") - return NewClient(ctx, addr, token, authHeaderName) + client, err = NewClient(ctx, addr, token, authHeaderName) + if err != nil { + return nil, err + } + client.IsWebSocket = false + + // Retry WS in the background so transient outages don't force a permanent downgrade. + retryCtx, retryCancel := context.WithCancel(context.Background()) + client.retryCancel = retryCancel + go client.retryWSLoop(retryCtx, logger, addr, token, authHeaderName) + + return client, nil } + client.IsWebSocket = true return client, nil } +const wsRetryInterval = 30 * time.Second + +// retryWSLoop periodically attempts to re-establish a WebSocket connection. +// When successful, it swaps the transport in-place and exits. +func (c *Client) retryWSLoop(ctx context.Context, logger zerolog.Logger, addr, token, authHeaderName string) { + ticker := time.NewTicker(wsRetryInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if c.tryUpgradeWS(ctx, logger, addr, token, authHeaderName) { + return + } + } + } +} + +// tryUpgradeWS attempts to open a WS connection and, if successful, swaps +// the transport internals so subsequent calls use WebSocket. Returns true +// when the upgrade succeeds (or the client is already on WS). +func (c *Client) tryUpgradeWS(ctx context.Context, logger zerolog.Logger, addr, token, authHeaderName string) bool { + wsClient, err := NewClient(ctx, httpToWS(addr), token, authHeaderName) + if err != nil { + return false + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Another goroutine may have already upgraded. + if c.IsWebSocket { + wsClient.Close() + return true + } + + // Swap function pointers from the new WS client into the active client. + c.Blob.Internal = wsClient.Blob.Internal + c.Header.Internal = wsClient.Header.Internal + + // Close the old HTTP connections and wire the new closer. + oldCloser := c.closer + c.closer = func() { + wsClient.closer() + if oldCloser != nil { + oldCloser() + } + } + + c.IsWebSocket = true + logger.Info().Msg("DA websocket connection restored, switching back from HTTP polling") + return true +} + // BlobAPI mirrors celestia-node's blob module (nodebuilder/blob/blob.go). // jsonrpc.NewClient wires Internal.* to RPC stubs. type BlobAPI struct { diff --git a/test/mocks/da.go b/test/mocks/da.go index e1daf70038..1309afc389 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -7,7 +7,7 @@ package mocks import ( "context" - "github.com/evstack/ev-node/pkg/da/types" + da "github.com/evstack/ev-node/pkg/da/types" mock "github.com/stretchr/testify/mock" ) @@ -555,6 +555,50 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat return _c } +// SupportsSubscribe provides a mock function for the type MockClient +func (_mock *MockClient) SupportsSubscribe() bool { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for SupportsSubscribe") + } + + var r0 bool + if returnFunc, ok := ret.Get(0).(func() bool); ok { + r0 = returnFunc() + } else { + r0 = ret.Get(0).(bool) + } + return r0 +} + +// MockClient_SupportsSubscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SupportsSubscribe' +type MockClient_SupportsSubscribe_Call struct { + *mock.Call +} + +// SupportsSubscribe is a helper method to define mock.On call +func (_e *MockClient_Expecter) SupportsSubscribe() *MockClient_SupportsSubscribe_Call { + return &MockClient_SupportsSubscribe_Call{Call: _e.mock.On("SupportsSubscribe")} +} + +func (_c *MockClient_SupportsSubscribe_Call) Run(run func()) *MockClient_SupportsSubscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClient_SupportsSubscribe_Call) Return(b bool) *MockClient_SupportsSubscribe_Call { + _c.Call.Return(b) + return _c +} + +func (_c *MockClient_SupportsSubscribe_Call) RunAndReturn(run func() bool) *MockClient_SupportsSubscribe_Call { + _c.Call.Return(run) + return _c +} + // Subscribe provides a mock function for the type MockClient func (_mock *MockClient) Subscribe(ctx context.Context, namespace []byte, fetchTimestamp bool) (<-chan da.SubscriptionEvent, error) { ret := _mock.Called(ctx, namespace, fetchTimestamp) diff --git a/test/testda/dummy.go b/test/testda/dummy.go index e1f93642c4..04599a8f58 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -243,6 +243,10 @@ func (d *DummyDA) GetForcedInclusionNamespace() []byte { return nil } // HasForcedInclusionNamespace reports whether forced inclusion is configured. func (d *DummyDA) HasForcedInclusionNamespace() bool { return false } +// SupportsSubscribe reports whether the underlying transport supports +// channel-based subscriptions. +func (d *DummyDA) SupportsSubscribe() bool { return true } + // GetLatestDAHeight returns the current DA height (the latest height available). func (d *DummyDA) GetLatestDAHeight(_ context.Context) (uint64, error) { return d.height.Load(), nil From 2b5828d230f27c6e43d739753871a067ade07687 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Jun 2026 15:11:59 +0200 Subject: [PATCH 2/4] feedback --- CHANGELOG.md | 2 +- block/internal/da/client.go | 9 +++++---- block/internal/da/subscriber.go | 9 ++++++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0849ae81c1..9ed3d72306 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- DA client falls back to HTTP polling with `Retrieve` when the WebSocket connection fails, instead of trying to use the WS-only `Subscribe` over HTTP. Automatically upgrade to WS is available [#3211](https://github.com/evstack/ev-node/pull/3361) +- DA client falls back to HTTP polling with `Retrieve` when the WebSocket connection fails, instead of trying to use the WS-only `Subscribe` over HTTP. A background goroutine retries WS every 30s so transient outages don't force a permanent downgrade [#3361](https://github.com/evstack/ev-node/pull/3361) ## v1.1.3 diff --git a/block/internal/da/client.go b/block/internal/da/client.go index b16950ce21..2ffba1d18d 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -32,13 +32,13 @@ type Config struct { type client struct { blobAPI *blobrpc.BlobAPI headerAPI *blobrpc.HeaderAPI + da *blobrpc.Client // kept to read live IsWebSocket after transport upgrades logger zerolog.Logger defaultTimeout time.Duration namespaceBz []byte dataNamespaceBz []byte forcedNamespaceBz []byte hasForcedNamespace bool - isWebSocket bool timestampCache *blockTimestampCache } @@ -132,13 +132,13 @@ func NewClient(cfg Config) FullClient { return &client{ blobAPI: &cfg.DA.Blob, headerAPI: &cfg.DA.Header, + da: cfg.DA, logger: cfg.Logger.With().Str("component", "da_client").Logger(), defaultTimeout: cfg.DefaultTimeout, namespaceBz: datypes.NamespaceFromString(cfg.Namespace).Bytes(), dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(), forcedNamespaceBz: forcedNamespaceBz, hasForcedNamespace: hasForcedNamespace, - isWebSocket: cfg.DA.IsWebSocket, timestampCache: newBlockTimestampCache(blockTimestampCacheWindow), } } @@ -488,9 +488,10 @@ func (c *client) HasForcedInclusionNamespace() bool { } // SupportsSubscribe reports whether the underlying transport supports -// channel-based subscriptions (WebSocket). +// channel-based subscriptions (WebSocket). Reads the live IsWebSocket flag +// from the jsonrpc client so transport upgrades are visible immediately. func (c *client) SupportsSubscribe() bool { - return c.isWebSocket + return c.da != nil && c.da.IsWebSocket } // Subscribe subscribes to blobs in the given namespace via the celestia-node diff --git a/block/internal/da/subscriber.go b/block/internal/da/subscriber.go index e59d9f17b4..cf3dd49084 100644 --- a/block/internal/da/subscriber.go +++ b/block/internal/da/subscriber.go @@ -124,11 +124,10 @@ func (s *Subscriber) Start(ctx context.Context) error { s.cancel = cancel s.lifecycleMu.Unlock() + s.wg.Add(2) if s.client.SupportsSubscribe() { - s.wg.Add(2) go s.followLoop(ctx) } else { - s.wg.Add(2) go s.pollLoop(ctx) } go s.catchupLoop(ctx) @@ -186,7 +185,11 @@ func (s *Subscriber) pollLoop(ctx context.Context) { // Do an immediate poll on startup so we don't wait for the first tick. s.pollDAHeight(ctx) - ticker := time.NewTicker(s.daBlockTime) + interval := s.daBlockTime + if interval <= 0 { + interval = 2 * time.Second + } + ticker := time.NewTicker(interval) defer ticker.Stop() for { From 5bbfaf262726d33a1fab60ffa432e14663ebd73d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Jun 2026 15:28:11 +0200 Subject: [PATCH 3/4] fix race --- block/internal/da/client.go | 2 +- pkg/da/jsonrpc/client.go | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 2ffba1d18d..0ffd7e41ed 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -491,7 +491,7 @@ func (c *client) HasForcedInclusionNamespace() bool { // channel-based subscriptions (WebSocket). Reads the live IsWebSocket flag // from the jsonrpc client so transport upgrades are visible immediately. func (c *client) SupportsSubscribe() bool { - return c.da != nil && c.da.IsWebSocket + return c.da != nil && c.da.IsWebSocket.Load() } // Subscribe subscribes to blobs in the given namespace via the celestia-node diff --git a/pkg/da/jsonrpc/client.go b/pkg/da/jsonrpc/client.go index 36fc2bd808..bfd6cb9a26 100644 --- a/pkg/da/jsonrpc/client.go +++ b/pkg/da/jsonrpc/client.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" libshare "github.com/celestiaorg/go-square/v3/share" @@ -17,7 +18,7 @@ import ( type Client struct { Blob BlobAPI Header HeaderAPI - IsWebSocket bool + IsWebSocket atomic.Bool mu sync.Mutex closer jsonrpc.ClientCloser @@ -35,9 +36,10 @@ func (c *Client) Close() { c.retryCancel() c.retryCancel = nil } + closer := c.closer c.mu.Unlock() - if c.closer != nil { - c.closer() + if closer != nil { + closer() } } @@ -100,7 +102,7 @@ func NewWSClient(ctx context.Context, logger zerolog.Logger, addr, token string, if err != nil { return nil, err } - client.IsWebSocket = false + client.IsWebSocket.Store(false) // Retry WS in the background so transient outages don't force a permanent downgrade. retryCtx, retryCancel := context.WithCancel(context.Background()) @@ -110,7 +112,7 @@ func NewWSClient(ctx context.Context, logger zerolog.Logger, addr, token string, return client, nil } - client.IsWebSocket = true + client.IsWebSocket.Store(true) return client, nil } @@ -147,7 +149,7 @@ func (c *Client) tryUpgradeWS(ctx context.Context, logger zerolog.Logger, addr, defer c.mu.Unlock() // Another goroutine may have already upgraded. - if c.IsWebSocket { + if c.IsWebSocket.Load() { wsClient.Close() return true } @@ -165,7 +167,7 @@ func (c *Client) tryUpgradeWS(ctx context.Context, logger zerolog.Logger, addr, } } - c.IsWebSocket = true + c.IsWebSocket.Store(true) logger.Info().Msg("DA websocket connection restored, switching back from HTTP polling") return true } From 9fc827b548bcce9254f822fa734d8bb1b2589e3c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 19 Jun 2026 16:41:21 +0200 Subject: [PATCH 4/4] update mocks --- pkg/sequencers/single/sequencer_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index a2374bb192..362f265c18 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -50,6 +50,7 @@ func setupForcedInclusionMockDA(t *testing.T, mockDA *MockFullDAClient, latestDA mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(latestDAHeight, nil).Maybe() @@ -910,6 +911,7 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 101 — close to sequencer start (100), no catch-up needed. @@ -1019,6 +1021,7 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 100 — same as sequencer start, no catch-up needed @@ -1117,6 +1120,7 @@ func TestSequencer_GetNextBatch_WithGasFiltering(t *testing.T) { mockDA.MockClient.On("GetBlobsAtHeight", mock.Anything, mock.Anything, mock.Anything). Return(forcedTxs, nil).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte("forced")).Maybe() mockDA.MockClient.On("MaxBlobSize", mock.Anything).Return(uint64(1000000), nil).Maybe() @@ -1224,6 +1228,7 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { mockDA := newMockFullDAClient(t) mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, @@ -1295,6 +1300,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at height 105 — sequencer starts at 100 with epoch size 1, @@ -1369,6 +1375,7 @@ func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — sequencer starts at 100 with epoch size 1, @@ -1465,6 +1472,7 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — multiple epochs ahead, triggers catch-up @@ -1528,6 +1536,7 @@ func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — multiple epochs ahead, triggers catch-up @@ -1737,6 +1746,7 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 106 — sequencer starts at 100 with epoch size 1, @@ -1892,6 +1902,7 @@ func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — multiple epochs ahead, triggers catch-up @@ -1991,6 +2002,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is far ahead — triggers catch-up @@ -2121,6 +2133,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() @@ -2201,6 +2214,7 @@ func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T mockDA := newMockFullDAClient(t) mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte("forced")).Maybe() mockDA.MockClient.On("MaxBlobSize", mock.Anything).Return(uint64(1000000), nil).Maybe()