@@ -1362,7 +1362,7 @@ channel_collapse(channel_T *channel, int part)
13621362 * Returns OK or FAIL.
13631363 */
13641364 static int
1365- channel_save (channel_T * channel , int part , char_u * buf , int len )
1365+ channel_save (channel_T * channel , int part , char_u * buf , int len , char * lead )
13661366{
13671367 readq_T * node ;
13681368 readq_T * head = & channel -> ch_part [part ].ch_head ;
@@ -1403,9 +1403,9 @@ channel_save(channel_T *channel, int part, char_u *buf, int len)
14031403 head -> rq_prev -> rq_next = node ;
14041404 head -> rq_prev = node ;
14051405
1406- if (log_fd != NULL )
1406+ if (log_fd != NULL && lead != NULL )
14071407 {
1408- ch_log_lead ("RECV " , channel );
1408+ ch_log_lead (lead , channel );
14091409 fprintf (log_fd , "'" );
14101410 if (fwrite (buf , len , 1 , log_fd ) != 1 )
14111411 return FAIL ;
@@ -1415,7 +1415,7 @@ channel_save(channel_T *channel, int part, char_u *buf, int len)
14151415}
14161416
14171417/*
1418- * Use the read buffer of "channel"/"part" and parse a JSON messages that is
1418+ * Use the read buffer of "channel"/"part" and parse a JSON message that is
14191419 * complete. The messages are added to the queue.
14201420 * Return TRUE if there is more to read.
14211421 */
@@ -1425,7 +1425,9 @@ channel_parse_json(channel_T *channel, int part)
14251425 js_read_T reader ;
14261426 typval_T listtv ;
14271427 jsonq_T * item ;
1428- jsonq_T * head = & channel -> ch_part [part ].ch_json_head ;
1428+ chanpart_T * chanpart = & channel -> ch_part [part ];
1429+ jsonq_T * head = & chanpart -> ch_json_head ;
1430+ int status ;
14291431 int ret ;
14301432
14311433 if (channel_peek (channel , part ) == NULL )
@@ -1438,15 +1440,23 @@ channel_parse_json(channel_T *channel, int part)
14381440 reader .js_fill = NULL ;
14391441 /* reader.js_fill = channel_fill; */
14401442 reader .js_cookie = channel ;
1441- ret = json_decode (& reader , & listtv ,
1442- channel -> ch_part [part ].ch_mode == MODE_JS ? JSON_JS : 0 );
1443- if (ret == OK )
1443+
1444+ /* When a message is incomplete we wait for a short while for more to
1445+ * arrive. After the delay drop the input, otherwise a truncated string
1446+ * or list will make us hang. */
1447+ status = json_decode (& reader , & listtv ,
1448+ chanpart -> ch_mode == MODE_JS ? JSON_JS : 0 );
1449+ if (status == OK )
14441450 {
14451451 /* Only accept the response when it is a list with at least two
14461452 * items. */
14471453 if (listtv .v_type != VAR_LIST || listtv .vval .v_list -> lv_len < 2 )
14481454 {
1449- /* TODO: give error */
1455+ if (listtv .v_type != VAR_LIST )
1456+ ch_error (channel , "Did not receive a list, discarding" );
1457+ else
1458+ ch_errorn (channel , "Expected list with two items, got %d" ,
1459+ listtv .vval .v_list -> lv_len );
14501460 clear_tv (& listtv );
14511461 }
14521462 else
@@ -1477,22 +1487,71 @@ channel_parse_json(channel_T *channel, int part)
14771487 }
14781488 }
14791489
1480- /* Put the unread part back into the channel.
1481- * TODO: insert in front */
1482- if (reader . js_buf [ reader . js_used ] != NUL )
1490+ if ( status == OK )
1491+ chanpart -> ch_waiting = FALSE;
1492+ else if (status == MAYBE )
14831493 {
1484- if (ret == FAIL )
1494+ if (! chanpart -> ch_waiting )
14851495 {
1486- ch_error (channel , "Decoding failed - discarding input" );
1487- ret = FALSE;
1496+ /* First time encountering incomplete message, set a deadline of
1497+ * 100 msec. */
1498+ ch_log (channel , "Incomplete message - wait for more" );
1499+ reader .js_used = 0 ;
1500+ chanpart -> ch_waiting = TRUE;
1501+ #ifdef WIN32
1502+ chanpart -> ch_deadline = GetTickCount () + 100L ;
1503+ #else
1504+ gettimeofday (& chanpart -> ch_deadline , NULL );
1505+ chanpart -> ch_deadline .tv_usec += 100 * 1000 ;
1506+ if (chanpart -> ch_deadline .tv_usec > 1000 * 1000 )
1507+ {
1508+ chanpart -> ch_deadline .tv_usec -= 1000 * 1000 ;
1509+ ++ chanpart -> ch_deadline .tv_sec ;
1510+ }
1511+ #endif
14881512 }
14891513 else
14901514 {
1491- channel_save (channel , part , reader .js_buf + reader .js_used ,
1492- (int )(reader .js_end - reader .js_buf ) - reader .js_used );
1493- ret = TRUE;
1515+ int timeout ;
1516+ #ifdef WIN32
1517+ timeout = GetTickCount () > chanpart -> ch_deadline ;
1518+ #else
1519+ {
1520+ struct timeval now_tv ;
1521+
1522+ gettimeofday (& now_tv , NULL );
1523+ timeout = now_tv .tv_sec > chanpart -> ch_deadline .tv_sec
1524+ || (now_tv .tv_sec == chanpart -> ch_deadline .tv_sec
1525+ && now_tv .tv_usec > chanpart -> ch_deadline .tv_usec );
1526+ }
1527+ #endif
1528+ if (timeout )
1529+ {
1530+ status = FAIL ;
1531+ chanpart -> ch_waiting = FALSE;
1532+ }
1533+ else
1534+ {
1535+ reader .js_used = 0 ;
1536+ ch_log (channel , "still waiting on incomplete message" );
1537+ }
14941538 }
14951539 }
1540+
1541+ if (status == FAIL )
1542+ {
1543+ ch_error (channel , "Decoding failed - discarding input" );
1544+ ret = FALSE;
1545+ chanpart -> ch_waiting = FALSE;
1546+ }
1547+ else if (reader .js_buf [reader .js_used ] != NUL )
1548+ {
1549+ /* Put the unread part back into the channel.
1550+ * TODO: insert in front */
1551+ channel_save (channel , part , reader .js_buf + reader .js_used ,
1552+ (int )(reader .js_end - reader .js_buf ) - reader .js_used , NULL );
1553+ ret = status == MAYBE ? FALSE: TRUE;
1554+ }
14961555 else
14971556 ret = FALSE;
14981557
@@ -1559,6 +1618,8 @@ channel_get_json(channel_T *channel, int part, int id, typval_T **rettv)
15591618 || tv -> vval .v_number != channel -> ch_part [part ].ch_block_id )))
15601619 {
15611620 * rettv = item -> jq_value ;
1621+ if (tv -> v_type == VAR_NUMBER )
1622+ ch_logn (channel , "Getting JSON message %d" , tv -> vval .v_number );
15621623 remove_json_node (head , item );
15631624 return OK ;
15641625 }
@@ -2289,7 +2350,7 @@ channel_read(channel_T *channel, int part, char *func)
22892350 break ; /* error or nothing more to read */
22902351
22912352 /* Store the read message in the queue. */
2292- channel_save (channel , part , buf , len );
2353+ channel_save (channel , part , buf , len , "RECV " );
22932354 readlen += len ;
22942355 if (len < MAXMSGSIZE )
22952356 break ; /* did read everything that's available */
@@ -2316,7 +2377,7 @@ channel_read(channel_T *channel, int part, char *func)
23162377 if (channel -> ch_part [part ].ch_mode == MODE_RAW
23172378 || channel -> ch_part [part ].ch_mode == MODE_NL )
23182379 channel_save (channel , part , (char_u * )DETACH_MSG_RAW ,
2319- (int )STRLEN (DETACH_MSG_RAW ));
2380+ (int )STRLEN (DETACH_MSG_RAW ), "PUT " );
23202381
23212382 /* TODO: When reading from stdout is not possible, should we try to
23222383 * keep stdin and stderr open? Probably not, assume the other side
@@ -2361,9 +2422,13 @@ channel_read_block(channel_T *channel, int part, int timeout)
23612422 continue ;
23622423
23632424 /* Wait for up to the channel timeout. */
2364- if (fd == INVALID_FD
2365- || channel_wait (channel , fd , timeout ) == FAIL )
2425+ if (fd == INVALID_FD )
2426+ return NULL ;
2427+ if (channel_wait (channel , fd , timeout ) == FAIL )
2428+ {
2429+ ch_log (channel , "Timed out" );
23662430 return NULL ;
2431+ }
23672432 channel_read (channel , part , "channel_read_block" );
23682433 }
23692434
@@ -2403,24 +2468,26 @@ channel_read_block(channel_T *channel, int part, int timeout)
24032468channel_read_json_block (
24042469 channel_T * channel ,
24052470 int part ,
2406- int timeout ,
2471+ int timeout_arg ,
24072472 int id ,
24082473 typval_T * * rettv )
24092474{
24102475 int more ;
24112476 sock_T fd ;
2477+ int timeout ;
2478+ chanpart_T * chanpart = & channel -> ch_part [part ];
24122479
24132480 ch_log (channel , "Reading JSON" );
24142481 if (id != -1 )
2415- channel -> ch_part [ part ]. ch_block_id = id ;
2482+ chanpart -> ch_block_id = id ;
24162483 for (;;)
24172484 {
24182485 more = channel_parse_json (channel , part );
24192486
24202487 /* search for messsage "id" */
24212488 if (channel_get_json (channel , part , id , rettv ) == OK )
24222489 {
2423- channel -> ch_part [ part ]. ch_block_id = 0 ;
2490+ chanpart -> ch_block_id = 0 ;
24242491 return OK ;
24252492 }
24262493
@@ -2431,14 +2498,50 @@ channel_read_json_block(
24312498 if (channel_parse_messages ())
24322499 continue ;
24332500
2434- /* Wait for up to the timeout. */
2435- fd = channel -> ch_part [part ].ch_fd ;
2501+ /* Wait for up to the timeout. If there was an incomplete message
2502+ * use the deadline for that. */
2503+ timeout = timeout_arg ;
2504+ if (chanpart -> ch_waiting )
2505+ {
2506+ #ifdef WIN32
2507+ timeout = chanpart -> ch_deadline - GetTickCount () + 1 ;
2508+ #else
2509+ {
2510+ struct timeval now_tv ;
2511+
2512+ gettimeofday (& now_tv , NULL );
2513+ timeout = (chanpart -> ch_deadline .tv_sec
2514+ - now_tv .tv_sec ) * 1000
2515+ + (chanpart -> ch_deadline .tv_usec
2516+ - now_tv .tv_usec ) / 1000
2517+ + 1 ;
2518+ }
2519+ #endif
2520+ if (timeout < 0 )
2521+ {
2522+ /* Something went wrong, channel_parse_json() didn't
2523+ * discard message. Cancel waiting. */
2524+ chanpart -> ch_waiting = FALSE;
2525+ timeout = timeout_arg ;
2526+ }
2527+ else if (timeout > timeout_arg )
2528+ timeout = timeout_arg ;
2529+ }
2530+ fd = chanpart -> ch_fd ;
24362531 if (fd == INVALID_FD || channel_wait (channel , fd , timeout ) == FAIL )
2437- break ;
2438- channel_read (channel , part , "channel_read_json_block" );
2532+ {
2533+ if (timeout == timeout_arg )
2534+ {
2535+ if (fd != INVALID_FD )
2536+ ch_log (channel , "Timed out" );
2537+ break ;
2538+ }
2539+ }
2540+ else
2541+ channel_read (channel , part , "channel_read_json_block" );
24392542 }
24402543 }
2441- channel -> ch_part [ part ]. ch_block_id = 0 ;
2544+ chanpart -> ch_block_id = 0 ;
24422545 return FAIL ;
24432546}
24442547
0 commit comments