@@ -1394,7 +1394,7 @@ can_write_buf_line(channel_T *channel)
13941394}
13951395
13961396/*
1397- * Write any lines to the input channel.
1397+ * Write any buffer lines to the input channel.
13981398 */
13991399 static void
14001400channel_write_in (channel_T * channel )
@@ -1466,6 +1466,25 @@ channel_buffer_free(buf_T *buf)
14661466 }
14671467}
14681468
1469+ /*
1470+ * Write any lines waiting to be written to "channel".
1471+ */
1472+ static void
1473+ channel_write_input (channel_T * channel )
1474+ {
1475+ chanpart_T * in_part = & channel -> ch_part [PART_IN ];
1476+
1477+ if (in_part -> ch_writeque .wq_next != NULL )
1478+ channel_send (channel , PART_IN , (char_u * )"" , 0 , "channel_write_input" );
1479+ else if (in_part -> ch_bufref .br_buf != NULL )
1480+ {
1481+ if (in_part -> ch_buf_append )
1482+ channel_write_new_lines (in_part -> ch_bufref .br_buf );
1483+ else
1484+ channel_write_in (channel );
1485+ }
1486+ }
1487+
14691488/*
14701489 * Write any lines waiting to be written to a channel.
14711490 */
@@ -1475,17 +1494,7 @@ channel_write_any_lines(void)
14751494 channel_T * channel ;
14761495
14771496 for (channel = first_channel ; channel != NULL ; channel = channel -> ch_next )
1478- {
1479- chanpart_T * in_part = & channel -> ch_part [PART_IN ];
1480-
1481- if (in_part -> ch_bufref .br_buf != NULL )
1482- {
1483- if (in_part -> ch_buf_append )
1484- channel_write_new_lines (in_part -> ch_bufref .br_buf );
1485- else
1486- channel_write_in (channel );
1487- }
1488- }
1497+ channel_write_input (channel );
14891498}
14901499
14911500/*
@@ -3005,7 +3014,9 @@ channel_fill_wfds(int maxfd_arg, fd_set *wfds)
30053014 {
30063015 chanpart_T * in_part = & ch -> ch_part [PART_IN ];
30073016
3008- if (in_part -> ch_fd != INVALID_FD && in_part -> ch_bufref .br_buf != NULL )
3017+ if (in_part -> ch_fd != INVALID_FD
3018+ && (in_part -> ch_bufref .br_buf != NULL
3019+ || in_part -> ch_writeque .wq_next != NULL ))
30093020 {
30103021 FD_SET ((int )in_part -> ch_fd , wfds );
30113022 if ((int )in_part -> ch_fd >= maxfd )
@@ -3554,6 +3565,29 @@ channel_handle_events(void)
35543565}
35553566# endif
35563567
3568+ /*
3569+ * Set "channel"/"part" to non-blocking.
3570+ * Only works for sockets and pipes.
3571+ */
3572+ void
3573+ channel_set_nonblock (channel_T * channel , ch_part_T part )
3574+ {
3575+ chanpart_T * ch_part = & channel -> ch_part [part ];
3576+ int fd = ch_part -> ch_fd ;
3577+
3578+ if (fd != INVALID_FD )
3579+ {
3580+ #ifdef _WIN32
3581+ u_long val = 1 ;
3582+
3583+ ioctlsocket (fd , FIONBIO , & val );
3584+ #else
3585+ fcntl (fd , F_SETFL , O_NONBLOCK );
3586+ #endif
3587+ ch_part -> ch_nonblocking = TRUE;
3588+ }
3589+ }
3590+
35573591/*
35583592 * Write "buf" (NUL terminated string) to "channel"/"part".
35593593 * When "fun" is not NULL an error message might be given.
@@ -3563,14 +3597,16 @@ channel_handle_events(void)
35633597channel_send (
35643598 channel_T * channel ,
35653599 ch_part_T part ,
3566- char_u * buf ,
3567- int len ,
3600+ char_u * buf_arg ,
3601+ int len_arg ,
35683602 char * fun )
35693603{
35703604 int res ;
35713605 sock_T fd ;
3606+ chanpart_T * ch_part = & channel -> ch_part [part ];
3607+ int did_use_queue = FALSE;
35723608
3573- fd = channel -> ch_part [ part ]. ch_fd ;
3609+ fd = ch_part -> ch_fd ;
35743610 if (fd == INVALID_FD )
35753611 {
35763612 if (!channel -> ch_error && fun != NULL )
@@ -3586,29 +3622,144 @@ channel_send(
35863622 {
35873623 ch_log_lead ("SEND " , channel );
35883624 fprintf (log_fd , "'" );
3589- ignored = (int )fwrite (buf , len , 1 , log_fd );
3625+ ignored = (int )fwrite (buf_arg , len_arg , 1 , log_fd );
35903626 fprintf (log_fd , "'\n" );
35913627 fflush (log_fd );
35923628 did_log_msg = TRUE;
35933629 }
35943630
3595- if (part == PART_SOCK )
3596- res = sock_write (fd , (char * )buf , len );
3597- else
3598- res = fd_write (fd , (char * )buf , len );
3599- if (res != len )
3631+ for (;;)
36003632 {
3601- if (!channel -> ch_error && fun != NULL )
3633+ writeq_T * wq = & ch_part -> ch_writeque ;
3634+ char_u * buf ;
3635+ int len ;
3636+
3637+ if (wq -> wq_next != NULL )
36023638 {
3603- ch_error (channel , "%s(): write failed" , fun );
3604- EMSG2 (_ ("E631: %s(): write failed" ), fun );
3639+ /* first write what was queued */
3640+ buf = wq -> wq_next -> wq_ga .ga_data ;
3641+ len = wq -> wq_next -> wq_ga .ga_len ;
3642+ did_use_queue = TRUE;
3643+ }
3644+ else
3645+ {
3646+ if (len_arg == 0 )
3647+ /* nothing to write, called from channel_select_check() */
3648+ return OK ;
3649+ buf = buf_arg ;
3650+ len = len_arg ;
36053651 }
3606- channel -> ch_error = TRUE;
3607- return FAIL ;
3608- }
36093652
3610- channel -> ch_error = FALSE;
3611- return OK ;
3653+ if (part == PART_SOCK )
3654+ res = sock_write (fd , (char * )buf , len );
3655+ else
3656+ res = fd_write (fd , (char * )buf , len );
3657+ if (res < 0 && (errno == EWOULDBLOCK
3658+ #ifdef EAGAIN
3659+ || errno == EAGAIN
3660+ #endif
3661+ ))
3662+ res = 0 ; /* nothing got written */
3663+
3664+ if (res >= 0 && ch_part -> ch_nonblocking )
3665+ {
3666+ writeq_T * entry = wq -> wq_next ;
3667+
3668+ if (did_use_queue )
3669+ ch_log (channel , "Sent %d bytes now" , res );
3670+ if (res == len )
3671+ {
3672+ /* Wrote all the buf[len] bytes. */
3673+ if (entry != NULL )
3674+ {
3675+ /* Remove the entry from the write queue. */
3676+ ga_clear (& entry -> wq_ga );
3677+ wq -> wq_next = entry -> wq_next ;
3678+ if (wq -> wq_next == NULL )
3679+ wq -> wq_prev = NULL ;
3680+ else
3681+ wq -> wq_next -> wq_prev = NULL ;
3682+ continue ;
3683+ }
3684+ if (did_use_queue )
3685+ ch_log (channel , "Write queue empty" );
3686+ }
3687+ else
3688+ {
3689+ /* Wrote only buf[res] bytes, can't write more now. */
3690+ if (entry != NULL )
3691+ {
3692+ if (res > 0 )
3693+ {
3694+ /* Remove the bytes that were written. */
3695+ mch_memmove (entry -> wq_ga .ga_data ,
3696+ (char * )entry -> wq_ga .ga_data + res ,
3697+ len - res );
3698+ entry -> wq_ga .ga_len -= res ;
3699+ }
3700+ buf = buf_arg ;
3701+ len = len_arg ;
3702+ }
3703+ else
3704+ {
3705+ buf += res ;
3706+ len -= res ;
3707+ }
3708+ ch_log (channel , "Adding %d bytes to the write queue" , len );
3709+
3710+ /* Append the not written bytes of the argument to the write
3711+ * buffer. Limit entries to 4000 bytes. */
3712+ if (wq -> wq_prev != NULL
3713+ && wq -> wq_prev -> wq_ga .ga_len + len < 4000 )
3714+ {
3715+ writeq_T * last = wq -> wq_prev ;
3716+
3717+ /* append to the last entry */
3718+ if (ga_grow (& last -> wq_ga , len ) == OK )
3719+ {
3720+ mch_memmove ((char * )last -> wq_ga .ga_data
3721+ + last -> wq_ga .ga_len ,
3722+ buf , len );
3723+ last -> wq_ga .ga_len += len ;
3724+ }
3725+ }
3726+ else
3727+ {
3728+ writeq_T * last = (writeq_T * )alloc ((int )sizeof (writeq_T ));
3729+
3730+ if (last != NULL )
3731+ {
3732+ last -> wq_prev = wq -> wq_prev ;
3733+ last -> wq_next = NULL ;
3734+ if (wq -> wq_prev == NULL )
3735+ wq -> wq_next = last ;
3736+ else
3737+ wq -> wq_prev -> wq_next = last ;
3738+ wq -> wq_prev = last ;
3739+ ga_init2 (& last -> wq_ga , 1 , 1000 );
3740+ if (ga_grow (& last -> wq_ga , len ) == OK )
3741+ {
3742+ mch_memmove (last -> wq_ga .ga_data , buf , len );
3743+ last -> wq_ga .ga_len = len ;
3744+ }
3745+ }
3746+ }
3747+ }
3748+ }
3749+ else if (res != len )
3750+ {
3751+ if (!channel -> ch_error && fun != NULL )
3752+ {
3753+ ch_error (channel , "%s(): write failed" , fun );
3754+ EMSG2 (_ ("E631: %s(): write failed" ), fun );
3755+ }
3756+ channel -> ch_error = TRUE;
3757+ return FAIL ;
3758+ }
3759+
3760+ channel -> ch_error = FALSE;
3761+ return OK ;
3762+ }
36123763}
36133764
36143765/*
@@ -3898,13 +4049,7 @@ channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
38984049 if (ret > 0 && in_part -> ch_fd != INVALID_FD
38994050 && FD_ISSET (in_part -> ch_fd , wfds ))
39004051 {
3901- if (in_part -> ch_buf_append )
3902- {
3903- if (in_part -> ch_bufref .br_buf != NULL )
3904- channel_write_new_lines (in_part -> ch_bufref .br_buf );
3905- }
3906- else
3907- channel_write_in (channel );
4052+ channel_write_input (channel );
39084053 -- ret ;
39094054 }
39104055 }
0 commit comments