Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [7.0.6]

[7.0.6]: https://github.com/microsoft/CCF/releases/tag/ccf-7.0.6

### Fixed

- Forwarded commands are no longer processed until the node is part of the network, matching the existing behaviour for other node-to-node messages. Previously a forwarded command could be executed while the node was in an earlier startup state, which could lead to undefined behaviour for some commands (#7936).

## [7.0.5]

[7.0.5]: https://github.com/microsoft/CCF/releases/tag/ccf-7.0.5
Expand Down
10 changes: 10 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,11 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/parse_json_safe.cpp
)

add_unit_test(
state_machine_test
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/state_machine.cpp
)

add_unit_test(
logger_test
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/logger.cpp
Expand Down Expand Up @@ -780,6 +785,11 @@ if(BUILD_TESTS)
PRIVATE ccf_kv ccf_endpoints ccf_tasks
)

add_unit_test(
node_inbound_message_test
${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/node_inbound_message.cpp
)

add_unit_test(
indexing_test
${CMAKE_CURRENT_SOURCE_DIR}/src/indexing/test/indexing.cpp
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "ccf"
version = "7.0.5"
version = "7.0.6"
authors = [
{ name="CCF Team", email="[email protected]" },
]
Expand Down
7 changes: 7 additions & 0 deletions src/ds/state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "ds/internal_logger.h"

#include <atomic>
#include <set>
#include <stdexcept>
#include <string>
#include <utility>
Comment thread
Copilot marked this conversation as resolved.

Expand Down Expand Up @@ -37,6 +39,11 @@ namespace ds
return state_ == state.load();
}

[[nodiscard]] bool check_one_of(const std::set<T>& states) const
{
return states.contains(state.load());
}

[[nodiscard]] T value() const
{
return state.load();
Expand Down
102 changes: 102 additions & 0 deletions src/ds/test/state_machine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.

#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include "../state_machine.h"

#include <doctest/doctest.h>

namespace
{
enum class Example
{
A,
B,
C,
D
};
}

template <>
struct fmt::formatter<Example> : fmt::formatter<std::string_view>
{
template <typename FormatContext>
auto format(Example e, FormatContext& ctx) const
{
std::string_view name = "unknown";
switch (e)
{
case Example::A:
name = "A";
break;
case Example::B:
name = "B";
break;
case Example::C:
name = "C";
break;
case Example::D:
name = "D";
break;
}
return fmt::formatter<std::string_view>::format(name, ctx);
}
};

TEST_CASE("Basic state machine" * doctest::test_suite("state_machine"))
{
ds::StateMachine<Example> sm("example", Example::A);

REQUIRE(sm.value() == Example::A);
REQUIRE(sm.check(Example::A));
REQUIRE_FALSE(sm.check(Example::B));
REQUIRE_NOTHROW(sm.expect(Example::A));
REQUIRE_THROWS_AS(sm.expect(Example::B), std::logic_error);

sm.advance(Example::B);

REQUIRE(sm.value() == Example::B);
REQUIRE(sm.check(Example::B));
REQUIRE_FALSE(sm.check(Example::A));
REQUIRE_NOTHROW(sm.expect(Example::B));
REQUIRE_THROWS_AS(sm.expect(Example::A), std::logic_error);
}

TEST_CASE("check_one_of" * doctest::test_suite("state_machine"))
{
ds::StateMachine<Example> sm("example", Example::A);

{
INFO("Current state is part of the set");
REQUIRE(sm.check_one_of({Example::A}));
REQUIRE(sm.check_one_of({Example::A, Example::B}));
REQUIRE(sm.check_one_of({Example::A, Example::B, Example::C}));
}

{
INFO("Current state is not part of the set");
REQUIRE_FALSE(sm.check_one_of({Example::B}));
REQUIRE_FALSE(sm.check_one_of({Example::B, Example::C}));
REQUIRE_FALSE(sm.check_one_of({Example::B, Example::C, Example::D}));
}

{
INFO("Empty set never matches");
REQUIRE_FALSE(sm.check_one_of({}));
}

{
INFO("Set membership follows state transitions");
const std::set<Example> states{Example::B, Example::C};
REQUIRE_FALSE(sm.check_one_of(states));

sm.advance(Example::B);
REQUIRE(sm.check_one_of(states));

sm.advance(Example::C);
REQUIRE(sm.check_one_of(states));

sm.advance(Example::D);
REQUIRE_FALSE(sm.check_one_of(states));
}
}
90 changes: 90 additions & 0 deletions src/node/node_inbound_message.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#include "ccf/entity_id.h"
#include "ccf/node_startup_state.h"
#include "ds/internal_logger.h"
#include "ds/state_machine.h"
#include "node/node_types.h"

namespace ccf
{
// Reads a serialised node_inbound message and dispatches it to the
// appropriate handler, but only once the node is part of the network. This
// includes forwarded commands, which must not be executed until the node is
// part of the network, as some commands may otherwise exhibit undefined
// behaviour.
template <typename TForwarder, typename TChannels, typename TConsensus>
void recv_node_inbound_message(
const uint8_t* data,
size_t size,
::ds::StateMachine<NodeStartupState>& sm,
TForwarder* cmd_forwarder,
TChannels* n2n_channels,
TConsensus* consensus)
{
auto [msg_type, from, payload] =
ringbuffer::read_message<node_inbound>(data, size);

const auto* payload_data = payload.data;
auto payload_size = payload.size;

static const std::set<NodeStartupState> active_states{
NodeStartupState::partOfNetwork,
NodeStartupState::partOfPublicNetwork,
NodeStartupState::readingPrivateLedger};

if (!sm.check_one_of(active_states))
{
LOG_DEBUG_FMT(
"Ignoring node msg received too early - current state is {}",
sm.value());
return;
}

switch (msg_type)
{
case forwarded_msg:
{
if (cmd_forwarder == nullptr)
{
LOG_FAIL_FMT(
"Ignoring forwarded node message: command forwarder not "
"initialised");
return;
}
cmd_forwarder->recv_message(from, payload_data, payload_size);
return;
}
case channel_msg:
{
if (n2n_channels == nullptr)
{
LOG_FAIL_FMT(
"Ignoring channel node message: node-to-node channels not "
"initialised");
return;
}
n2n_channels->recv_channel_message(from, payload_data, payload_size);
return;
}
case consensus_msg:
{
if (consensus == nullptr)
{
LOG_FAIL_FMT(
"Ignoring consensus node message: consensus not initialised");
return;
}
consensus->recv_message(from, payload_data, payload_size);
return;
}
default:
{
throw std::logic_error(fmt::format(
"Unknown node message type: {}", static_cast<uint32_t>(msg_type)));
}
}
}
}
59 changes: 8 additions & 51 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "node/ledger_secret.h"
#include "node/ledger_secrets.h"
#include "node/local_sealing.h"
#include "node/node_inbound_message.h"
#include "node/node_to_node_channel_manager.h"
#include "node/recovery_decision_protocol.h"
#include "node/signature_cache_subsystem.h"
Expand Down Expand Up @@ -2254,57 +2255,13 @@ namespace ccf

void recv_node_inbound(const uint8_t* data, size_t size)
{
auto [msg_type, from, payload] =
ringbuffer::read_message<node_inbound>(data, size);

const auto* payload_data = payload.data;
auto payload_size = payload.size;

if (msg_type == NodeMsgType::forwarded_msg)
{
cmd_forwarder->recv_message(from, payload_data, payload_size);
}
else
{
// Only process messages once part of network
if (
!sm.check(NodeStartupState::partOfNetwork) &&
!sm.check(NodeStartupState::partOfPublicNetwork) &&
!sm.check(NodeStartupState::readingPrivateLedger))
{
LOG_DEBUG_FMT(
"Ignoring node msg received too early - current state is {}",
sm.value());
return;
}

switch (msg_type)
{
case forwarded_msg:
{
LOG_FAIL_FMT("Unexpected forwarded_msg in recv_node_inbound");
return;
}
case channel_msg:
{
n2n_channels->recv_channel_message(
from, payload_data, payload_size);
return;
}

case consensus_msg:
{
consensus->recv_message(from, payload_data, payload_size);
return;
}
default:
{
throw std::logic_error(fmt::format(
"Unknown node message type: {}",
static_cast<uint32_t>(msg_type)));
}
}
}
recv_node_inbound_message(
data,
size,
sm,
cmd_forwarder.get(),
n2n_channels.get(),
consensus.get());
}
Comment thread
achamayou marked this conversation as resolved.

//
Expand Down
Loading