@@ -1373,7 +1373,7 @@ can_write_buf_line(channel_T *channel)
13731373}
13741374
13751375/*
1376- * Write any lines to the input channel.
1376+ * Write any buffer lines to the input channel.
13771377 */
13781378 static void
13791379channel_write_in (channel_T * channel )
@@ -1445,6 +1445,25 @@ channel_buffer_free(buf_T *buf)
14451445 }
14461446}
14471447
1448+ /*
1449+ * Write any lines waiting to be written to "channel".
1450+ */
1451+ static void
1452+ channel_write_input (channel_T * channel )
1453+ {
1454+ chanpart_T * in_part = & channel -> ch_part [PART_IN ];
1455+
1456+ if (in_part -> ch_writeque .wq_next != NULL )
1457+ channel_send (channel , PART_IN , (char_u * )"" , 0 , "channel_write_input" );
1458+ else if (in_part -> ch_bufref .br_buf != NULL )
1459+ {
1460+ if (in_part -> ch_buf_append )
1461+ channel_write_new_lines (in_part -> ch_bufref .br_buf );
1462+ else
1463+ channel_write_in (channel );
1464+ }
1465+ }
1466+
14481467/*
14491468 * Write any lines waiting to be written to a channel.
14501469 */
@@ -1454,17 +1473,7 @@ channel_write_any_lines(void)
14541473 channel_T * channel ;
14551474
14561475 for (channel = first_channel ; channel != NULL ; channel = channel -> ch_next )
1457- {
1458- chanpart_T * in_part = & channel -> ch_part [PART_IN ];
1459-
1460- if (in_part -> ch_bufref .br_buf != NULL )
1461- {
1462- if (in_part -> ch_buf_append )
1463- channel_write_new_lines (in_part -> ch_bufref .br_buf );
1464- else
1465- channel_write_in (channel );
1466- }
1467- }
1476+ channel_write_input (channel );
14681477}
14691478
14701479/*
@@ -2984,7 +2993,9 @@ channel_fill_wfds(int maxfd_arg, fd_set *wfds)
29842993 {
29852994 chanpart_T * in_part = & ch -> ch_part [PART_IN ];
29862995
2987- if (in_part -> ch_fd != INVALID_FD && in_part -> ch_bufref .br_buf != NULL )
2996+ if (in_part -> ch_fd != INVALID_FD
2997+ && (in_part -> ch_bufref .br_buf != NULL
2998+ || in_part -> ch_writeque .wq_next != NULL ))
29882999 {
29893000 FD_SET ((int )in_part -> ch_fd , wfds );
29903001 if ((int )in_part -> ch_fd >= maxfd )
@@ -3529,6 +3540,31 @@ channel_handle_events(void)
35293540}
35303541# endif
35313542
3543+ /*
3544+ * Set "channel"/"part" to non-blocking.
3545+ */
3546+ void
3547+ channel_set_nonblock (channel_T * channel , ch_part_T part )
3548+ {
3549+ chanpart_T * ch_part = & channel -> ch_part [part ];
3550+ int fd = ch_part -> ch_fd ;
3551+
3552+ if (fd != INVALID_FD )
3553+ {
3554+ #ifdef _WIN32
3555+ if (part == PART_SOCK )
3556+ {
3557+ u_long val = 1 ;
3558+
3559+ ioctlsocket (fd , FIONBIO , & val );
3560+ }
3561+ else
3562+ #endif
3563+ fcntl (fd , F_SETFL , O_NONBLOCK );
3564+ ch_part -> ch_nonblocking = TRUE;
3565+ }
3566+ }
3567+
35323568/*
35333569 * Write "buf" (NUL terminated string) to "channel"/"part".
35343570 * When "fun" is not NULL an error message might be given.
@@ -3538,14 +3574,16 @@ channel_handle_events(void)
35383574channel_send (
35393575 channel_T * channel ,
35403576 ch_part_T part ,
3541- char_u * buf ,
3542- int len ,
3577+ char_u * buf_arg ,
3578+ int len_arg ,
35433579 char * fun )
35443580{
35453581 int res ;
35463582 sock_T fd ;
3583+ chanpart_T * ch_part = & channel -> ch_part [part ];
3584+ int did_use_queue = FALSE;
35473585
3548- fd = channel -> ch_part [ part ]. ch_fd ;
3586+ fd = ch_part -> ch_fd ;
35493587 if (fd == INVALID_FD )
35503588 {
35513589 if (!channel -> ch_error && fun != NULL )
@@ -3561,29 +3599,145 @@ channel_send(
35613599 {
35623600 ch_log_lead ("SEND " , channel );
35633601 fprintf (log_fd , "'" );
3564- ignored = (int )fwrite (buf , len , 1 , log_fd );
3602+ ignored = (int )fwrite (buf_arg , len_arg , 1 , log_fd );
35653603 fprintf (log_fd , "'\n" );
35663604 fflush (log_fd );
35673605 did_log_msg = TRUE;
35683606 }
35693607
3570- if (part == PART_SOCK )
3571- res = sock_write (fd , (char * )buf , len );
3572- else
3573- res = fd_write (fd , (char * )buf , len );
3574- if (res != len )
3608+ for (;;)
35753609 {
3576- if (!channel -> ch_error && fun != NULL )
3610+ writeq_T * wq = & ch_part -> ch_writeque ;
3611+ char_u * buf ;
3612+ int len ;
3613+
3614+ if (wq -> wq_next != NULL )
35773615 {
3578- ch_error (channel , "%s(): write failed" , fun );
3579- EMSG2 (_ ("E631: %s(): write failed" ), fun );
3616+ /* first write what was queued */
3617+ buf = wq -> wq_next -> wq_ga .ga_data ;
3618+ len = wq -> wq_next -> wq_ga .ga_len ;
3619+ did_use_queue = TRUE;
3620+ }
3621+ else
3622+ {
3623+ if (len_arg == 0 )
3624+ /* nothing to write, called from channel_select_check() */
3625+ return OK ;
3626+ buf = buf_arg ;
3627+ len = len_arg ;
35803628 }
3581- channel -> ch_error = TRUE;
3582- return FAIL ;
3583- }
35843629
3585- channel -> ch_error = FALSE;
3586- return OK ;
3630+ if (part == PART_SOCK )
3631+ res = sock_write (fd , (char * )buf , len );
3632+ else
3633+ res = fd_write (fd , (char * )buf , len );
3634+ if (res < 0 && (errno == EWOULDBLOCK
3635+ #ifdef EAGAIN
3636+ || errno == EAGAIN
3637+ #endif
3638+ ))
3639+ res = 0 ; /* nothing got written */
3640+
3641+ if (res >= 0 && ch_part -> ch_nonblocking )
3642+ {
3643+ writeq_T * entry = wq -> wq_next ;
3644+
3645+ if (did_use_queue )
3646+ ch_log (channel , "Sent %d bytes now" , res );
3647+ if (res == len )
3648+ {
3649+ /* Wrote all the buf[len] bytes. */
3650+ if (entry != NULL )
3651+ {
3652+ /* Remove the entry from the write queue. */
3653+ ga_clear (& entry -> wq_ga );
3654+ wq -> wq_next = entry -> wq_next ;
3655+ if (wq -> wq_next == NULL )
3656+ wq -> wq_prev = NULL ;
3657+ else
3658+ wq -> wq_next -> wq_prev = NULL ;
3659+ continue ;
3660+ }
3661+ if (did_use_queue )
3662+ ch_log (channel , "Write queue empty" );
3663+ }
3664+ else
3665+ {
3666+ /* Wrote only buf[res] bytes, can't write more now. */
3667+ if (entry != NULL )
3668+ {
3669+ if (res > 0 )
3670+ {
3671+ /* Remove the bytes that were written. */
3672+ mch_memmove (entry -> wq_ga .ga_data ,
3673+ (char * )entry -> wq_ga .ga_data + res ,
3674+ len - res );
3675+ entry -> wq_ga .ga_len -= res ;
3676+ }
3677+ buf = buf_arg ;
3678+ len = len_arg ;
3679+ }
3680+ else
3681+ {
3682+ buf += res ;
3683+ len -= res ;
3684+ }
3685+ ch_log (channel , "Adding %d bytes to the write queue" , len );
3686+
3687+ /* Append the not written bytes of the argument to the write
3688+ * buffer. Limit entries to 4000 bytes. */
3689+ if (wq -> wq_prev != NULL
3690+ && wq -> wq_prev -> wq_ga .ga_len + len < 4000 )
3691+ {
3692+ writeq_T * last = wq -> wq_prev ;
3693+
3694+ /* append to the last entry */
3695+ if (ga_grow (& last -> wq_ga , len ) == OK )
3696+ {
3697+ mch_memmove ((char * )last -> wq_ga .ga_data
3698+ + last -> wq_ga .ga_len ,
3699+ buf , len );
3700+ last -> wq_ga .ga_len += len ;
3701+ }
3702+ }
3703+ else
3704+ {
3705+ writeq_T * last = (writeq_T * )alloc ((int )sizeof (writeq_T ));
3706+
3707+ if (last != NULL )
3708+ {
3709+ ch_log (channel , "Creating new entry" );
3710+ last -> wq_prev = wq -> wq_prev ;
3711+ last -> wq_next = NULL ;
3712+ if (wq -> wq_prev == NULL )
3713+ wq -> wq_next = last ;
3714+ else
3715+ wq -> wq_prev -> wq_next = last ;
3716+ wq -> wq_prev = last ;
3717+ ga_init2 (& last -> wq_ga , 1 , 1000 );
3718+ if (ga_grow (& last -> wq_ga , len ) == OK )
3719+ {
3720+ mch_memmove (last -> wq_ga .ga_data , buf , len );
3721+ last -> wq_ga .ga_len = len ;
3722+ }
3723+ }
3724+ }
3725+ }
3726+ }
3727+ else if (res != len )
3728+ {
3729+ if (!channel -> ch_error && fun != NULL )
3730+ {
3731+ ch_error (channel , "%s(): write failed" , fun );
3732+ EMSG2 (_ ("E631: %s(): write failed" ), fun );
3733+ }
3734+ channel -> ch_error = TRUE;
3735+ return FAIL ;
3736+ }
3737+
3738+ channel -> ch_error = FALSE;
3739+ return OK ;
3740+ }
35873741}
35883742
35893743/*
@@ -3873,13 +4027,7 @@ channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
38734027 if (ret > 0 && in_part -> ch_fd != INVALID_FD
38744028 && FD_ISSET (in_part -> ch_fd , wfds ))
38754029 {
3876- if (in_part -> ch_buf_append )
3877- {
3878- if (in_part -> ch_bufref .br_buf != NULL )
3879- channel_write_new_lines (in_part -> ch_bufref .br_buf );
3880- }
3881- else
3882- channel_write_in (channel );
4030+ channel_write_input (channel );
38834031 -- ret ;
38844032 }
38854033 }
0 commit comments