Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/llm/language_model/legacy/servable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ absl::Status LegacyServable::parseRequest(std::shared_ptr<GenAiServableExecution
!legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) {
streamerConfig.insert(ov::genai::skip_special_tokens(false));
}
auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus {
auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta, bool isLast) -> ov::genai::StreamingStatus {
if (ctx.clientDisconnected.load()) {
ctx.deltaChannel.signalComplete();
return ov::genai::StreamingStatus::CANCEL;
}
ctx.deltaChannel.push(std::move(delta));
ctx.deltaChannel.push(std::move(delta), isLast);
return ov::genai::StreamingStatus::RUNNING;
};
legacyExecutionContext->textStreamer = std::make_shared<OVMSTextStreamer>(
Expand Down
13 changes: 7 additions & 6 deletions src/llm/ovms_text_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,15 @@ ov::genai::StreamingStatus OVMSTextStreamer::flush_chunk(
delta = std::move(doc);
}

const bool isLast = (finish_reason != ov::genai::GenerationFinishReason::NONE);
if (delta.has_value()) {
return m_callback(std::move(*delta));
return m_callback(std::move(*delta), isLast);
}
if (finish_reason != ov::genai::GenerationFinishReason::NONE) {
// Parser produced no delta for the final flush (e.g. generation ended
// on a special token the parser absorbed). Still fire the callback with
// an empty Document so preparePartialResponse can emit the finish_reason.
return m_callback(rapidjson::Document{});
if (isLast) {
// Parser produced no delta for the final flush (e.g. generation ended on a
// special token the parser absorbed). Still fire the callback with an empty
// Document so the caller can emit the finish_reason chunk.
return m_callback(rapidjson::Document{}, true);
}
Comment on lines +203 to 208
return ov::genai::StreamingStatus::RUNNING;
}
Expand Down
7 changes: 5 additions & 2 deletions src/llm/ovms_text_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ namespace ovms {
// fires the callback unconditionally, preserving existing behavior.
class OVMSTextStreamer : public ov::genai::TextStreamer {
public:
// Callback receives a Document and returns the streaming status.
// Callback receives a Document and the isLast flag, and returns the streaming status.
// Document shape is always {"delta":{...}} matching the OpenAI delta format.
// For the finish-only case (nullopt from parseChunk + STOP finishReason),
// an empty Document{} is passed so the caller can emit the finish_reason chunk.
using Callback = std::function<ov::genai::StreamingStatus(rapidjson::Document)>;
// isLast is true when finish_reason != NONE — callers that push into a DeltaChannel
// should forward this flag to DeltaChannel::push() so the final document and the
// completion signal are observed atomically (no separate signalComplete() needed).
using Callback = std::function<ov::genai::StreamingStatus(rapidjson::Document, bool /*isLast*/)>;

// outputParser may be nullptr (e.g. for the unary VLM path).
// TODO(phase3): rework ownership — OVMSTextStreamer should not need to keep
Expand Down
4 changes: 2 additions & 2 deletions src/llm/servable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ absl::Status GenAiServable::parseRequest(std::shared_ptr<GenAiServableExecutionC
}

if (executionContext->apiHandler->isStream()) {
auto ovmsCallback = [& ctx = *executionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus {
ctx.deltaChannel.push(std::move(delta));
auto ovmsCallback = [& ctx = *executionContext](rapidjson::Document delta, bool isLast) -> ov::genai::StreamingStatus {
ctx.deltaChannel.push(std::move(delta), isLast);
return ov::genai::StreamingStatus::RUNNING;
};
ov::AnyMap streamerConfig;
Expand Down
9 changes: 7 additions & 2 deletions src/llm/servable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,21 @@ enum class ChatTemplateMode {
// thread, so the mutex is acquired but uncontested.
struct DeltaChannel {
// Push a delta from any thread (streamer callback).
void push(rapidjson::Document delta) {
// When isLast is true, also marks the channel complete atomically so consumers
// always see the final document and the completion flag in the same observation.
void push(rapidjson::Document delta, bool isLast = false) {
{
std::lock_guard<std::mutex> lock(m_mutex);
m_deltas.push_back(std::move(delta));
if (isLast)
m_complete = true;
}
Comment on lines +84 to 92
m_cv.notify_one();
}

// Signal that no more deltas will be pushed (generation complete or cancelled).
// May be called from any thread.
// May be called from any thread. Also acts as a safety-net for paths where
// push(delta, isLast=true) may not fire (e.g. client disconnection mid-stream).
void signalComplete() {
{
std::lock_guard<std::mutex> lock(m_mutex);
Expand Down
6 changes: 3 additions & 3 deletions src/llm/visual_language_model/legacy/servable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ absl::Status VisualLanguageModelLegacyServable::parseRequest(std::shared_ptr<Gen
!legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) {
streamerConfig.insert(ov::genai::skip_special_tokens(false));
}
auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus {
auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta, bool isLast) -> ov::genai::StreamingStatus {
if (ctx.clientDisconnected.load()) {
ctx.deltaChannel.signalComplete();
return ov::genai::StreamingStatus::CANCEL;
}
ctx.deltaChannel.push(std::move(delta));
ctx.deltaChannel.push(std::move(delta), isLast);
return ov::genai::StreamingStatus::RUNNING;
};
legacyExecutionContext->textStreamer = std::make_shared<OVMSTextStreamer>(
Expand Down Expand Up @@ -155,7 +155,7 @@ absl::Status VisualLanguageModelLegacyServable::parseRequest(std::shared_ptr<Gen
!legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) {
streamerConfig.insert(ov::genai::skip_special_tokens(false));
}
auto unaryCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus {
auto unaryCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta, bool /*isLast*/) -> ov::genai::StreamingStatus {
if (delta.HasMember("delta") && delta["delta"].IsObject() &&
delta["delta"].HasMember("content") && delta["delta"]["content"].IsString()) {
ctx.accumulatedUnaryText += delta["delta"]["content"].GetString();
Expand Down