Skip to content

Commit b2658a1

Browse files
committed
patch 7.4.1787
Problem: When a job ends the close callback is invoked before other callbacks. On Windows the close callback is not called. Solution: First invoke out/err callbacks before the close callback. Make the close callback work on Windows.
1 parent d10abe5 commit b2658a1

5 files changed

Lines changed: 120 additions & 50 deletions

File tree

src/channel.c

Lines changed: 84 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
# define fd_close(sd) close(sd)
5555
#endif
5656

57+
static void channel_read(channel_T *channel, int part, char *func);
58+
5759
/* Whether a redraw is needed for appending a line to a buffer. */
5860
static int channel_need_redraw = FALSE;
5961

@@ -2427,18 +2429,28 @@ channel_close(channel_T *channel, int invoke_close_cb)
24272429
typval_T argv[1];
24282430
typval_T rettv;
24292431
int dummy;
2432+
int part;
24302433

2431-
/* invoke the close callback; increment the refcount to avoid it
2432-
* being freed halfway */
2433-
ch_logs(channel, "Invoking close callback %s",
2434-
(char *)channel->ch_close_cb);
2435-
argv[0].v_type = VAR_CHANNEL;
2436-
argv[0].vval.v_channel = channel;
2434+
/* Invoke callbacks before the close callback, since it's weird to
2435+
* first invoke the close callback. Increment the refcount to avoid
2436+
* the channel being freed halfway. */
24372437
++channel->ch_refcount;
2438-
call_func(channel->ch_close_cb, (int)STRLEN(channel->ch_close_cb),
2438+
for (part = PART_SOCK; part <= PART_ERR; ++part)
2439+
while (may_invoke_callback(channel, part))
2440+
;
2441+
2442+
/* Invoke the close callback, if still set. */
2443+
if (channel->ch_close_cb != NULL)
2444+
{
2445+
ch_logs(channel, "Invoking close callback %s",
2446+
(char *)channel->ch_close_cb);
2447+
argv[0].v_type = VAR_CHANNEL;
2448+
argv[0].vval.v_channel = channel;
2449+
call_func(channel->ch_close_cb, (int)STRLEN(channel->ch_close_cb),
24392450
&rettv, 1, argv, 0L, 0L, &dummy, TRUE,
24402451
channel->ch_close_partial, NULL);
2441-
clear_tv(&rettv);
2452+
clear_tv(&rettv);
2453+
}
24422454
--channel->ch_refcount;
24432455

24442456
/* the callback is only called once */
@@ -2592,11 +2604,19 @@ channel_fill_poll_write(int nfd_in, struct pollfd *fds)
25922604
}
25932605
#endif
25942606

2607+
typedef enum {
2608+
CW_READY,
2609+
CW_NOT_READY,
2610+
CW_ERROR
2611+
} channel_wait_result;
2612+
25952613
/*
25962614
* Check for reading from "fd" with "timeout" msec.
2597-
* Return FAIL when there is nothing to read.
2615+
* Return CW_READY when there is something to read.
2616+
* Return CW_NOT_READY when there is nothing to read.
2617+
* Return CW_ERROR when there is an error.
25982618
*/
2599-
static int
2619+
static channel_wait_result
26002620
channel_wait(channel_T *channel, sock_T fd, int timeout)
26012621
{
26022622
if (timeout > 0)
@@ -2613,9 +2633,12 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
26132633
/* reading from a pipe, not a socket */
26142634
while (TRUE)
26152635
{
2616-
if (PeekNamedPipe((HANDLE)fd, NULL, 0, NULL, &nread, NULL)
2617-
&& nread > 0)
2618-
return OK;
2636+
int r = PeekNamedPipe((HANDLE)fd, NULL, 0, NULL, &nread, NULL);
2637+
2638+
if (r && nread > 0)
2639+
return CW_READY;
2640+
if (r == 0)
2641+
return CW_ERROR;
26192642

26202643
/* perhaps write some buffer lines */
26212644
channel_write_any_lines();
@@ -2665,7 +2688,7 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
26652688
if (ret > 0)
26662689
{
26672690
if (FD_ISSET(fd, &rfds))
2668-
return OK;
2691+
return CW_READY;
26692692
channel_write_any_lines();
26702693
continue;
26712694
}
@@ -2683,23 +2706,52 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
26832706
if (poll(fds, nfd, timeout) > 0)
26842707
{
26852708
if (fds[0].revents & POLLIN)
2686-
return OK;
2709+
return CW_READY;
26872710
channel_write_any_lines();
26882711
continue;
26892712
}
26902713
break;
26912714
}
26922715
#endif
26932716
}
2694-
return FAIL;
2717+
return CW_NOT_READY;
2718+
}
2719+
2720+
static void
2721+
channel_close_on_error(channel_T *channel, int part, char *func)
2722+
{
2723+
/* Do not call emsg(), most likely the other end just exited. */
2724+
ch_errors(channel, "%s(): Cannot read from channel", func);
2725+
2726+
/* Queue a "DETACH" netbeans message in the command queue in order to
2727+
* terminate the netbeans session later. Do not end the session here
2728+
* directly as we may be running in the context of a call to
2729+
* netbeans_parse_messages():
2730+
* netbeans_parse_messages
2731+
* -> autocmd triggered while processing the netbeans cmd
2732+
* -> ui_breakcheck
2733+
* -> gui event loop or select loop
2734+
* -> channel_read()
2735+
* Don't send "DETACH" for a JS or JSON channel.
2736+
*/
2737+
if (channel->ch_part[part].ch_mode == MODE_RAW
2738+
|| channel->ch_part[part].ch_mode == MODE_NL)
2739+
channel_save(channel, part, (char_u *)DETACH_MSG_RAW,
2740+
(int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT ");
2741+
2742+
/* When reading from stdout is not possible, assume the other side has
2743+
* died. */
2744+
channel_close(channel, TRUE);
2745+
if (channel->ch_nb_close_cb != NULL)
2746+
(*channel->ch_nb_close_cb)();
26952747
}
26962748

26972749
/*
26982750
* Read from channel "channel" for as long as there is something to read.
26992751
* "part" is PART_SOCK, PART_OUT or PART_ERR.
27002752
* The data is put in the read queue.
27012753
*/
2702-
void
2754+
static void
27032755
channel_read(channel_T *channel, int part, char *func)
27042756
{
27052757
static char_u *buf = NULL;
@@ -2729,7 +2781,7 @@ channel_read(channel_T *channel, int part, char *func)
27292781
* MAXMSGSIZE long. */
27302782
for (;;)
27312783
{
2732-
if (channel_wait(channel, fd, 0) == FAIL)
2784+
if (channel_wait(channel, fd, 0) != CW_READY)
27332785
break;
27342786
if (use_socket)
27352787
len = sock_read(fd, (char *)buf, MAXMSGSIZE);
@@ -2747,33 +2799,7 @@ channel_read(channel_T *channel, int part, char *func)
27472799

27482800
/* Reading a disconnection (readlen == 0), or an error. */
27492801
if (readlen <= 0)
2750-
{
2751-
/* Do not give an error message, most likely the other end just
2752-
* exited. */
2753-
ch_errors(channel, "%s(): Cannot read from channel", func);
2754-
2755-
/* Queue a "DETACH" netbeans message in the command queue in order to
2756-
* terminate the netbeans session later. Do not end the session here
2757-
* directly as we may be running in the context of a call to
2758-
* netbeans_parse_messages():
2759-
* netbeans_parse_messages
2760-
* -> autocmd triggered while processing the netbeans cmd
2761-
* -> ui_breakcheck
2762-
* -> gui event loop or select loop
2763-
* -> channel_read()
2764-
* Don't send "DETACH" for a JS or JSON channel.
2765-
*/
2766-
if (channel->ch_part[part].ch_mode == MODE_RAW
2767-
|| channel->ch_part[part].ch_mode == MODE_NL)
2768-
channel_save(channel, part, (char_u *)DETACH_MSG_RAW,
2769-
(int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT ");
2770-
2771-
/* When reading from stdout is not possible, assume the other side has
2772-
* died. */
2773-
channel_close(channel, TRUE);
2774-
if (channel->ch_nb_close_cb != NULL)
2775-
(*channel->ch_nb_close_cb)();
2776-
}
2802+
channel_close_on_error(channel, part, func);
27772803

27782804
#if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
27792805
/* signal the main loop that there is something to read */
@@ -2812,7 +2838,7 @@ channel_read_block(channel_T *channel, int part, int timeout)
28122838
/* Wait for up to the channel timeout. */
28132839
if (fd == INVALID_FD)
28142840
return NULL;
2815-
if (channel_wait(channel, fd, timeout) == FAIL)
2841+
if (channel_wait(channel, fd, timeout) != CW_READY)
28162842
{
28172843
ch_log(channel, "Timed out");
28182844
return NULL;
@@ -2916,7 +2942,8 @@ channel_read_json_block(
29162942
timeout = timeout_arg;
29172943
}
29182944
fd = chanpart->ch_fd;
2919-
if (fd == INVALID_FD || channel_wait(channel, fd, timeout) == FAIL)
2945+
if (fd == INVALID_FD
2946+
|| channel_wait(channel, fd, timeout) != CW_READY)
29202947
{
29212948
if (timeout == timeout_arg)
29222949
{
@@ -3037,8 +3064,16 @@ channel_handle_events(void)
30373064
for (part = PART_SOCK; part <= PART_ERR; ++part)
30383065
{
30393066
fd = channel->ch_part[part].ch_fd;
3040-
if (fd != INVALID_FD && channel_wait(channel, fd, 0) == OK)
3041-
channel_read(channel, part, "channel_handle_events");
3067+
if (fd != INVALID_FD)
3068+
{
3069+
int r = channel_wait(channel, fd, 0);
3070+
3071+
if (r == CW_READY)
3072+
channel_read(channel, part, "channel_handle_events");
3073+
else if (r == CW_ERROR)
3074+
channel_close_on_error(channel, part,
3075+
"channel_handle_events()");
3076+
}
30423077
}
30433078
}
30443079
}

src/proto/channel.pro

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ void channel_close(channel_T *channel, int invoke_close_cb);
2626
char_u *channel_peek(channel_T *channel, int part);
2727
void channel_clear(channel_T *channel);
2828
void channel_free_all(void);
29-
void channel_read(channel_T *channel, int part, char *func);
3029
char_u *channel_read_block(channel_T *channel, int part, int timeout);
3130
int channel_read_json_block(channel_T *channel, int part, int timeout_arg, int id, typval_T **rettv);
3231
void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);

src/testdir/test_channel.vim

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,38 @@ func Test_out_cb()
10481048
endtry
10491049
endfunc
10501050

1051+
func Test_out_close_cb()
1052+
if !has('job')
1053+
return
1054+
endif
1055+
call ch_log('Test_out_close_cb()')
1056+
1057+
let s:counter = 1
1058+
let s:outmsg = 0
1059+
let s:closemsg = 0
1060+
func! OutHandler(chan, msg)
1061+
let s:outmsg = s:counter
1062+
let s:counter += 1
1063+
endfunc
1064+
func! CloseHandler(chan)
1065+
let s:closemsg = s:counter
1066+
let s:counter += 1
1067+
endfunc
1068+
let job = job_start(s:python . " test_channel_pipe.py quit now",
1069+
\ {'out_cb': 'OutHandler',
1070+
\ 'close_cb': 'CloseHandler'})
1071+
call assert_equal("run", job_status(job))
1072+
try
1073+
call s:waitFor('s:closemsg != 0 && s:outmsg != 0')
1074+
call assert_equal(1, s:outmsg)
1075+
call assert_equal(2, s:closemsg)
1076+
finally
1077+
call job_stop(job)
1078+
delfunc OutHandler
1079+
delfunc CloseHandler
1080+
endtry
1081+
endfunc
1082+
10511083
""""""""""
10521084

10531085
let s:unletResponse = ''

src/testdir/test_channel_pipe.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
else:
1717
print(sys.argv[1])
1818
sys.stdout.flush()
19+
if sys.argv[1].startswith("quit"):
20+
sys.exit(0)
1921

2022
while True:
2123
typed = sys.stdin.readline()

src/version.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,8 @@ static char *(features[]) =
753753

754754
static int included_patches[] =
755755
{ /* Add new patch number below this line */
756+
/**/
757+
1787,
756758
/**/
757759
1786,
758760
/**/

0 commit comments

Comments
 (0)