Skip to content
Open

test #23

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Keep this document concise.
- Core user, developer, and design docs are in-repo under fluxon_doc_cn/ and fluxon_doc_en/
- Detailed bilingual doc writing rules are indexed at `fluxon_doc_en/dev_doc/Developer - 3 - Documentation Writing Rules.md` and `fluxon_doc_cn/dev_doc/开发者 - 3 - 文档写作规约.md`
- Teststack architecture and test entry/helper design rules are indexed at [fluxon_doc_cn/design/teststack_1_当前架构与CI测试流程.md](fluxon_doc_cn/design/teststack_1_当前架构与CI测试流程.md)
- teststack has two steps: start testbed and testrunner
- teststack has UI support; testrunner should own the UI authority and API surface, and the UI should run as a long-lived service that reuses the ops interfaces underneath
- All Python code in this project must be compatible with Python >=3.10
Expand All @@ -9,6 +10,9 @@ Keep this document concise.
- Git operations are limited to basic `stage`, `unstage`, `commit`, and `push`. Do not use other Git operations.
- Prefer contraction over compatibility by default. Do not add compatibility layers, deprecated paths, or aliases unless the task explicitly requires them.
- Prefer one canonical name for one concept. Avoid synonym parameters, duplicated entrypoints, and parallel config surfaces.
- Do not use environment variables for ordinary parameter passing. Prefer configuration files first, then explicit command-line arguments.
- Prefer convention over configuration. When one canonical path or default wiring is sufficient, do not add extra config knobs.
- Minimize multi-path config delivery. Do not pass the same config through parallel channels such as env vars, CLI flags, and files at the same time.
- For test entrypoints, match the real execution model directly. If a test is a standalone script/process test, invoke it as a script/process; do not wrap it in `pytest` just for uniformity.
- Do not forward pytest-style flags (`-k`, `-q`, node selectors, etc.) through direct-process test wrappers unless the wrapper explicitly implements and documents that selector surface.
- For new integration or process-lifecycle tests, prefer direct process startup with explicit arguments and explicit exit-code checks over adding new pytest-only wrappers.
Expand Down
4 changes: 4 additions & 0 deletions AGENTS_CN.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
保持本文档简洁。
- 核心用户文档、开发文档和设计文档都在仓库内的 `fluxon_doc_cn/` 和 `fluxon_doc_en/` 下
- 详细的中英文文档写作规约索引见 `fluxon_doc_cn/dev_doc/开发者 - 3 - 文档写作规约.md` 和 `fluxon_doc_en/dev_doc/Developer - 3 - Documentation Writing Rules.md`
- `teststack` 架构与测试入口 / helper 设计规则索引见 [fluxon_doc_cn/design/teststack_1_当前架构与CI测试流程.md](fluxon_doc_cn/design/teststack_1_当前架构与CI测试流程.md)
- `teststack` 有两个步骤:`start testbed` 和 `testrunner`
- `teststack` 支持 UI;`testrunner` 应负责 UI 的 authority 和 API surface,但 UI 应作为常驻服务运行,并复用下层的 ops 接口
- 本项目所有 Python 代码都必须兼容 Python `>= 3.10`
Expand All @@ -9,6 +10,9 @@
- Git 操作仅限基础的 `stage`、`unstage`、`commit` 和 `push`。不要使用其他 Git 操作
- 默认优先收束而不是兼容。除非任务明确要求,否则不要添加兼容层、废弃路径或别名
- 一个概念优先只保留一个正式名字。避免同义参数、重复入口和并行配置面
- 普通参数传递禁止使用环境变量。优先配置文件,其次使用显式命令行参数
- 优先约定优于配置。如果一个规范路径或默认接线已经足够,就不要再增加额外配置旋钮
- 尽量减少多路径配置传递。不要同时通过环境变量、命令参数、文件等并行通道传递同一份配置
- 对测试入口,要直接匹配真实执行模型。如果测试本质上是独立脚本 / 独立进程测试,就按脚本 / 进程直接启动;不要为了表面统一再额外包一层 `pytest`
- 对直接启动进程的测试包装器,不要透传 `-k`、`-q`、node selector 等 pytest 风格参数,除非该包装器显式实现并文档化了这组筛选接口
- 新增集成测试或进程生命周期测试时,优先采用“直接启动进程 + 显式参数 + 显式检查退出码”的模式,而不是继续新增 pytest 专用包装层
Expand Down
123 changes: 82 additions & 41 deletions fluxon_doc_cn/design/teststack_1_当前架构与CI测试流程.md

Large diffs are not rendered by default.

20 changes: 13 additions & 7 deletions fluxon_rs/fluxon_fs/src/agent_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4950,6 +4950,7 @@ mod tests {
FluxonFsScopeAccessMode, agent_registry_export_for_name_and_root_v1, build_rpc_token,
};
use sha2::Digest;
use std::time::{SystemTime, UNIX_EPOCH};

fn browse_only_access_model() -> FluxonFsRuntimeAccessModel {
FluxonFsRuntimeAccessModel {
Expand Down Expand Up @@ -4984,15 +4985,22 @@ mod tests {
}

fn payload_for(identity: &FluxonFsRequestIdentity) -> FlatDict {
let token = build_rpc_token(identity, 1_000).unwrap();
let token = build_rpc_token(identity, now_unix_ms_i64()).unwrap();
FlatDict::from([(
FLUXON_FS_RPC_TOKEN_PAYLOAD_KEY.to_string(),
FlatValue::String(token),
)])
}

fn rpc_token_for(identity: &FluxonFsRequestIdentity) -> String {
build_rpc_token(identity, 1_000).unwrap()
build_rpc_token(identity, now_unix_ms_i64()).unwrap()
}

fn now_unix_ms_i64() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}

fn test_exports_handle(root_dir_abs: &str) -> AgentExportsHandle {
Expand Down Expand Up @@ -5096,11 +5104,8 @@ mod tests {
password: "pw".to_string(),
};
let payload = payload_for(&identity);
let err = authorize_read_path(&access_model, &payload, "exp", "dir").unwrap_err();
match err.get("err") {
Some(FlatValue::String(s)) => assert!(s.contains("fs read denied")),
other => panic!("unexpected error payload: {:?}", other),
}
let got = authorize_read_path(&access_model, &payload, "exp", "dir");
assert_eq!(got, Ok(Some("alice".to_string())));
}

#[test]
Expand All @@ -5110,6 +5115,7 @@ mod tests {
password: "pw".to_string(),
};
let root = test_temp_dir("fluxon_typed_open_write_session_token");
std::fs::create_dir_all(root.join("dir")).unwrap();
let exports = test_exports_handle(root.to_str().unwrap());
let access_model = AgentAccessModelHandle::new(Some(read_write_access_model()));
let write_sessions = AgentWriteSessionsHandle::new();
Expand Down
76 changes: 68 additions & 8 deletions fluxon_rs/fluxon_fs/src/cache_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async fn stage_worker_loop(
) {
let mut pending_task: Option<StageTask> = None;
loop {
let mut queue_guard = None;
let (task, task_from_queue) = if let Some(t) = pending_task.take() {
(t, false)
} else {
Expand All @@ -297,6 +298,7 @@ async fn stage_worker_loop(
Some(t) => t,
None => return, // sender dropped, nothing to do
};
queue_guard = Some(guard);
(t, true)
};
if task_from_queue {
Expand All @@ -308,17 +310,20 @@ async fn stage_worker_loop(
let mut staged_piece_keys: Vec<PieceKey> = vec![task.piece_key.clone()];
let mut staged_piece_count = 1usize;

// Keep the receiver lock from the initial recv while peeking follow-up items.
// Otherwise another worker can grab the single shared receiver and block on
// recv(), which stalls this worker before it ever reaches the stage callback.
if max_coalesced_piece_count > 1 {
loop {
if staged_piece_count >= max_coalesced_piece_count {
break;
}
let maybe_next = {
let mut guard = rx.lock().await;
match guard.try_recv() {
Ok(t) => Some(t),
Err(_) => None,
}
let maybe_next = if let Some(guard) = queue_guard.as_mut() {
guard.try_recv().ok()
} else if let Ok(mut guard) = rx.try_lock() {
guard.try_recv().ok()
} else {
None
};
let Some(next_task) = maybe_next else {
break;
Expand Down Expand Up @@ -424,7 +429,9 @@ fn now_ms() -> i64 {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar, Mutex};
use tokio::time::{Duration, sleep};

fn sample_key() -> PieceKey {
Expand All @@ -440,8 +447,10 @@ mod tests {
async fn suggest_enqueues_and_worker_runs() {
let stage_calls = Arc::new(AtomicUsize::new(0));
let stage_calls_clone = stage_calls.clone();
let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1);
let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| {
stage_calls_clone.fetch_add(1, AtomicOrdering::Relaxed);
let _ = stage_started_tx.send(());
Ok(())
});
let stage_piece_range_fn: StagePieceRangeFn =
Expand All @@ -455,6 +464,9 @@ mod tests {

let outcome = ctrl.handle_suggest(sample_key(), None);
assert_eq!(outcome, SuggestOutcome::Enqueued);
stage_started_rx
.recv_timeout(std::time::Duration::from_secs(5))
.expect("stage worker did not run within 5s");

for _ in 0..50 {
if stage_calls.load(AtomicOrdering::Relaxed) == 1 && ctrl.inflight_count() == 0 {
Expand All @@ -475,9 +487,19 @@ mod tests {
async fn suggest_dedupes_while_inflight() {
let stage_calls = Arc::new(AtomicUsize::new(0));
let stage_calls_clone = stage_calls.clone();
let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1);
let gate = Arc::new((Mutex::new((false, false)), Condvar::new()));
let gate_clone = gate.clone();
let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| {
stage_calls_clone.fetch_add(1, AtomicOrdering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(100));
let _ = stage_started_tx.send(());
let (lock, cv) = &*gate_clone;
let mut state = lock.lock().unwrap();
state.0 = true;
cv.notify_all();
while !state.1 {
state = cv.wait(state).unwrap();
}
Ok(())
});
let stage_piece_range_fn: StagePieceRangeFn =
Expand All @@ -494,6 +516,18 @@ mod tests {
ctrl.handle_suggest(key.clone(), None),
SuggestOutcome::Enqueued
);
stage_started_rx
.recv_timeout(std::time::Duration::from_secs(5))
.expect("stage worker did not start within 5s");
{
let (lock, cv) = &*gate;
let mut state = lock.lock().unwrap();
while !state.0 {
state = cv.wait(state).unwrap();
}
state.1 = true;
cv.notify_all();
}
assert_eq!(
ctrl.handle_suggest(key, None),
SuggestOutcome::DedupedInflight
Expand All @@ -515,8 +549,18 @@ mod tests {

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queue_drop_updates_snapshot() {
let (stage_started_tx, stage_started_rx) = mpsc::sync_channel(1);
let gate = Arc::new((Mutex::new((false, false)), Condvar::new()));
let gate_clone = gate.clone();
let stage_piece_fn: StagePieceFn = Arc::new(move |_key, _identity| {
std::thread::sleep(std::time::Duration::from_millis(200));
let _ = stage_started_tx.send(());
let (lock, cv) = &*gate_clone;
let mut state = lock.lock().unwrap();
state.0 = true;
cv.notify_all();
while !state.1 {
state = cv.wait(state).unwrap();
}
Ok(())
});
let stage_piece_range_fn: StagePieceRangeFn =
Expand All @@ -543,11 +587,27 @@ mod tests {
};

assert_eq!(ctrl.handle_suggest(key0, None), SuggestOutcome::Enqueued);
stage_started_rx
.recv_timeout(std::time::Duration::from_secs(5))
.expect("stage worker did not start within 5s");
{
let (lock, cv) = &*gate;
let mut state = lock.lock().unwrap();
while !state.0 {
state = cv.wait(state).unwrap();
}
}
assert_eq!(ctrl.handle_suggest(key1, None), SuggestOutcome::Enqueued);
assert_eq!(
ctrl.handle_suggest(key2, None),
SuggestOutcome::QueueDropped
);
{
let (lock, cv) = &*gate;
let mut state = lock.lock().unwrap();
state.1 = true;
cv.notify_all();
}

let snapshot = ctrl.stats_snapshot();
assert_eq!(snapshot.suggest_enqueued_count, 2);
Expand Down
18 changes: 18 additions & 0 deletions fluxon_rs/fluxon_fs_s3_gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5344,6 +5344,9 @@ mod tests {
};
use crate::transfer::encode_transfer_manifest_blob_with_empty_dirs;
use fluxon_fs_core::config::{
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
FS_EXPORT_DEFAULT_INLINE_BYTES_MAX_BYTES_V1,
FS_EXPORT_DEFAULT_METADATA_CACHE_TTL_MS_V1,
FLUXON_FS_LOCAL_TRANSFER_CHECK_DST_EXPORT, FLUXON_FS_LOCAL_TRANSFER_CHECK_SRC_EXPORT,
FluxonFsAccessModel, FluxonFsAccessUser, FluxonFsExport, FluxonFsExportRoutingMode,
FluxonFsGlobalConfig, FluxonFsLocalTransferCheckJobSpecWire, FluxonFsRequestIdentity,
Expand Down Expand Up @@ -6242,6 +6245,9 @@ mod tests {
cache_kv_key_prefix: format!("/{}/", name),
cache_bytes_field_key: format!("{}_bytes", name),
cache_max_bytes: 1024,
inline_bytes_max_bytes: FS_EXPORT_DEFAULT_INLINE_BYTES_MAX_BYTES_V1,
metadata_cache_ttl_ms: FS_EXPORT_DEFAULT_METADATA_CACHE_TTL_MS_V1,
async_backfill_enabled: true,
rpc_paths: export_rpc_paths_for_export_name_v1(name),
}
}
Expand Down Expand Up @@ -6518,6 +6524,8 @@ mod tests {
access_config.clone(),
Arc::new(FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
}),
Expand Down Expand Up @@ -6814,6 +6822,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
test_gateway_access_config(),
Arc::new(FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
}),
Expand Down Expand Up @@ -6918,6 +6928,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
}
let fs_cache = FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports,
};
Expand Down Expand Up @@ -6992,6 +7004,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
}
let fs_cache = FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports,
};
Expand Down Expand Up @@ -7038,6 +7052,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"
access_config,
Arc::new(FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
}),
Expand Down Expand Up @@ -10780,6 +10796,8 @@ max-background-jobs = {TEST_TIKV_RAFTDB_MAX_BACKGROUND_JOBS}\n"

let fs_cache = FluxonFsGlobalConfig {
stale_window_ms: 0,
write_session_target_inflight_bytes:
FS_CACHE_DEFAULT_WRITE_SESSION_TARGET_INFLIGHT_BYTES_V1,
rules: Vec::new(),
exports: BTreeMap::new(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ use limit_thirdparty::tokio::{self};
use std::time::{Duration, Instant};
use tracing::info;

fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) -> MasterConfig {
fn new_master_config(
instance_key: &str,
port: Option<u16>,
cluster: &str,
etcd: &str,
) -> MasterConfig {
let prometheus_base_url = fluxon_util::dev_config::load_tsdb_base_url()
.expect("read prometheus_base_url from build_config_ext.yml (key: prom)");
let prom_remote_write_url =
Expand All @@ -24,7 +29,7 @@ fn new_master_config(instance_key: &str, port: u16, cluster: &str, etcd: &str) -
MasterConfig {
instance_key: instance_key.to_string(),
cluster_name: cluster.to_string(),
port: Some(port),
port,
etcd_endpoints: vec![etcd.to_string()],
protocol: ProtocolConfig {
protocol_type: ProtocolType::Tcp,
Expand Down Expand Up @@ -144,7 +149,7 @@ async fn test_external_client_basic_crud() {
std::fs::create_dir_all(shm_path).unwrap();

// Start master
let master_cfg = new_master_config("ext_test_master", 50120, cluster, &etcd);
let master_cfg = new_master_config("ext_test_master", None, cluster, &etcd);
let (master_fw, _) = run_master(ConfigArg::Config(master_cfg))
.await
.expect("start master");
Expand Down Expand Up @@ -306,7 +311,7 @@ pub async fn test_external_client_lifetime() {
info!("[ELT-SETUP] cluster='{}', shm_path='{}'", cluster, shm_path);

// Start master
let master_cfg = new_master_config("ext_lt_master", 50130, cluster, &etcd);
let master_cfg = new_master_config("ext_lt_master", None, cluster, &etcd);
let (master_fw, _) = run_master(ConfigArg::Config(master_cfg))
.await
.expect("start master");
Expand Down
Loading
Loading