11import { NodeHttpServer } from "@effect/platform-node"
22import Http from "node:http"
33import { describe , expect } from "bun:test"
4- import { Effect } from "effect"
4+ import { Context , Effect , Layer , Queue } from "effect"
55import { HttpServer , HttpServerRequest , HttpServerResponse } from "effect/unstable/http"
6+ import * as Socket from "effect/unstable/socket/Socket"
67import { HttpApiProxy } from "../../src/server/routes/instance/httpapi/middleware/proxy"
78import { testEffect } from "../lib/effect"
89
910function serverUrl ( ) {
11+ return HttpServer . HttpServer . use ( ( server ) => Effect . succeed ( HttpServer . formatAddress ( server . address ) ) )
12+ }
13+
14+ const testServerLayer = Layer . mergeAll (
15+ NodeHttpServer . layer ( Http . createServer , { host : "127.0.0.1" , port : 0 } ) ,
16+ Socket . layerWebSocketConstructorGlobal ,
17+ )
18+ const it = testEffect ( testServerLayer )
19+
20+ type TestHandler < E , R > = (
21+ request : HttpServerRequest . HttpServerRequest ,
22+ ) => Effect . Effect < HttpServerResponse . HttpServerResponse , E , R >
23+
24+ function listenServer < E , R > ( handler : TestHandler < E , R > ) {
1025 return Effect . gen ( function * ( ) {
11- return HttpServer . formatAddress ( ( yield * HttpServer . HttpServer ) . address )
26+ yield * HttpServer . serveEffect ( ) ( HttpServerRequest . HttpServerRequest . use ( handler ) )
27+ return yield * serverUrl ( )
1228 } )
1329}
1430
15- const testServerLayer = NodeHttpServer . layer ( Http . createServer , { host : "127.0.0.1" , port : 0 } )
16- const it = testEffect ( testServerLayer )
31+ function listenTestServer < E , R > ( handler : TestHandler < E , R > ) {
32+ return Effect . gen ( function * ( ) {
33+ // Build into the current test scope so the listener stays alive until the
34+ // test finishes. Using Effect.provide here would release it immediately.
35+ const context = yield * Layer . build ( NodeHttpServer . layer ( Http . createServer , { host : "127.0.0.1" , port : 0 } ) )
36+ const server = Context . get ( context , HttpServer . HttpServer )
37+ yield * server . serve ( HttpServerRequest . HttpServerRequest . use ( handler ) )
38+ return HttpServer . formatAddress ( server . address )
39+ } )
40+ }
41+
42+ function echoWebSocket ( request : HttpServerRequest . HttpServerRequest ) {
43+ return Effect . gen ( function * ( ) {
44+ const socket = yield * Effect . orDie ( request . upgrade )
45+ const write = yield * socket . writer
46+ // The upstream announces the negotiated protocol, then echoes every
47+ // received frame. The assertions use those messages to prove proxy flow.
48+ yield * socket
49+ . runRaw ( ( message ) => write ( `echo:${ message } ` ) , {
50+ onOpen : write ( `protocol:${ request . headers [ "sec-websocket-protocol" ] ?? "none" } ` ) . pipe (
51+ Effect . catch ( ( ) => Effect . void ) ,
52+ ) ,
53+ } )
54+ . pipe ( Effect . catch ( ( ) => Effect . void ) )
55+ return HttpServerResponse . empty ( )
56+ } )
57+ }
1758
1859describe ( "HttpApi workspace proxy" , ( ) => {
1960 it . live ( "proxies HTTP request and returns streamed response with status and headers" , ( ) =>
2061 Effect . gen ( function * ( ) {
21- yield * HttpServer . serveEffect ( ) (
22- Effect . gen ( function * ( ) {
23- const req = yield * HttpServerRequest . HttpServerRequest
62+ const url = yield * listenServer (
63+ Effect . fnUntraced ( function * ( req : HttpServerRequest . HttpServerRequest ) {
2464 const body = yield * req . text
2565 return yield * HttpServerResponse . json (
2666 { path : req . url , method : req . method , body } ,
@@ -35,7 +75,6 @@ describe("HttpApi workspace proxy", () => {
3575 )
3676 } ) ,
3777 )
38- const url = yield * serverUrl ( )
3978
4079 const request = HttpServerRequest . fromWeb (
4180 new Request ( "http://localhost/session/abc" , { method : "POST" , body : "request-body" } ) ,
@@ -67,14 +106,12 @@ describe("HttpApi workspace proxy", () => {
67106 it . live ( "strips opencode-internal headers and merges extra headers" , ( ) =>
68107 Effect . gen ( function * ( ) {
69108 let forwarded : Record < string , string > = { }
70- yield * HttpServer . serveEffect ( ) (
71- Effect . gen ( function * ( ) {
72- const req = yield * HttpServerRequest . HttpServerRequest
109+ const url = yield * listenServer ( ( req ) =>
110+ Effect . sync ( ( ) => {
73111 forwarded = req . headers
74112 return HttpServerResponse . empty ( )
75113 } ) ,
76114 )
77- const url = yield * serverUrl ( )
78115
79116 const request = HttpServerRequest . fromWeb (
80117 new Request ( "http://localhost/test" , {
@@ -93,4 +130,26 @@ describe("HttpApi workspace proxy", () => {
93130 expect ( forwarded [ "x-injected" ] ) . toBe ( "extra" )
94131 } ) ,
95132 )
133+
134+ it . live ( "proxies websocket messages and protocols" , ( ) =>
135+ Effect . gen ( function * ( ) {
136+ const upstreamUrl = yield * listenTestServer ( echoWebSocket )
137+
138+ // Client -> proxy listener -> HttpApiProxy.websocket -> upstream listener.
139+ // The client never connects to upstream directly.
140+ const proxyUrl = yield * listenServer ( ( request ) => HttpApiProxy . websocket ( request , `${ upstreamUrl } /echo` ) )
141+
142+ const socket = yield * Socket . makeWebSocket ( `${ proxyUrl . replace ( / ^ h t t p / , "ws" ) } /proxy` , {
143+ closeCodeIsError : ( ) => false ,
144+ protocols : "chat" ,
145+ } )
146+ const messages = yield * Queue . unbounded < string > ( )
147+ yield * socket . runRaw ( ( message ) => Queue . offer ( messages , String ( message ) ) ) . pipe ( Effect . forkScoped )
148+ const write = yield * socket . writer
149+
150+ expect ( yield * Queue . take ( messages ) ) . toBe ( "protocol:chat" )
151+ yield * write ( "hello" )
152+ expect ( yield * Queue . take ( messages ) ) . toBe ( "echo:hello" )
153+ } ) ,
154+ )
96155} )
0 commit comments