Skip to content

Commit 958dc69

Browse files
committed
patch 8.0.0107
Problem: When reading channel output in a timer, messages may go missing. (Skywind) Solution: Add the "drop" option. Write error messages in the channel log. Don't have ch_canread() check for the channel being open.
1 parent 0945eaf commit 958dc69

7 files changed

Lines changed: 145 additions & 19 deletions

File tree

runtime/doc/channel.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,13 @@ Use |ch_status()| to see if the channel could be opened.
155155
func MyCloseHandler(channel)
156156
< Vim will invoke callbacks that handle data before invoking
157157
close_cb, thus when this function is called no more data will
158-
be received.
158+
be passed to the callbacks.
159+
*channel-drop*
160+
"drop" Specifies when to drop messages:
161+
"auto" When there is no callback to handle a message.
162+
The "close_cb" is also considered for this.
163+
"never" All messages will be kept.
164+
159165
*waittime*
160166
"waittime" The time to wait for the connection to be made in
161167
milliseconds. A negative number waits forever.

src/channel.c

Lines changed: 108 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,6 +1195,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
11951195
if (opt->jo_set & JO_CLOSE_CALLBACK)
11961196
set_callback(&channel->ch_close_cb, &channel->ch_close_partial,
11971197
opt->jo_close_cb, opt->jo_close_partial);
1198+
channel->ch_drop_never = opt->jo_drop_never;
11981199

11991200
if ((opt->jo_set & JO_OUT_IO) && opt->jo_io[PART_OUT] == JIO_BUFFER)
12001201
{
@@ -1918,6 +1919,7 @@ channel_parse_json(channel_T *channel, ch_part_T part)
19181919
clear_tv(&listtv);
19191920
else
19201921
{
1922+
item->jq_no_callback = FALSE;
19211923
item->jq_value = alloc_tv();
19221924
if (item->jq_value == NULL)
19231925
{
@@ -2050,11 +2052,17 @@ remove_json_node(jsonq_T *head, jsonq_T *node)
20502052
* When "id" is positive it must match the first number in the list.
20512053
* When "id" is zero or negative jut get the first message. But not the one
20522054
* with id ch_block_id.
2055+
* When "without_callback" is TRUE also get messages that were pushed back.
20532056
* Return OK when found and return the value in "rettv".
20542057
* Return FAIL otherwise.
20552058
*/
20562059
static int
2057-
channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
2060+
channel_get_json(
2061+
channel_T *channel,
2062+
ch_part_T part,
2063+
int id,
2064+
int without_callback,
2065+
typval_T **rettv)
20582066
{
20592067
jsonq_T *head = &channel->ch_part[part].ch_json_head;
20602068
jsonq_T *item = head->jq_next;
@@ -2064,10 +2072,11 @@ channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
20642072
list_T *l = item->jq_value->vval.v_list;
20652073
typval_T *tv = &l->lv_first->li_tv;
20662074

2067-
if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
2075+
if ((without_callback || !item->jq_no_callback)
2076+
&& ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
20682077
|| (id <= 0 && (tv->v_type != VAR_NUMBER
20692078
|| tv->vval.v_number == 0
2070-
|| tv->vval.v_number != channel->ch_part[part].ch_block_id)))
2079+
|| tv->vval.v_number != channel->ch_part[part].ch_block_id))))
20712080
{
20722081
*rettv = item->jq_value;
20732082
if (tv->v_type == VAR_NUMBER)
@@ -2080,6 +2089,65 @@ channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
20802089
return FAIL;
20812090
}
20822091

2092+
/*
2093+
* Put back "rettv" into the JSON queue, there was no callback for it.
2094+
* Takes over the values in "rettv".
2095+
*/
2096+
static void
2097+
channel_push_json(channel_T *channel, ch_part_T part, typval_T *rettv)
2098+
{
2099+
jsonq_T *head = &channel->ch_part[part].ch_json_head;
2100+
jsonq_T *item = head->jq_next;
2101+
jsonq_T *newitem;
2102+
2103+
if (head->jq_prev != NULL && head->jq_prev->jq_no_callback)
2104+
/* last item was pushed back, append to the end */
2105+
item = NULL;
2106+
else while (item != NULL && item->jq_no_callback)
2107+
/* append after the last item that was pushed back */
2108+
item = item->jq_next;
2109+
2110+
newitem = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T));
2111+
if (newitem == NULL)
2112+
clear_tv(rettv);
2113+
else
2114+
{
2115+
newitem->jq_value = alloc_tv();
2116+
if (newitem->jq_value == NULL)
2117+
{
2118+
vim_free(newitem);
2119+
clear_tv(rettv);
2120+
}
2121+
else
2122+
{
2123+
newitem->jq_no_callback = FALSE;
2124+
*newitem->jq_value = *rettv;
2125+
if (item == NULL)
2126+
{
2127+
/* append to the end */
2128+
newitem->jq_prev = head->jq_prev;
2129+
head->jq_prev = newitem;
2130+
newitem->jq_next = NULL;
2131+
if (newitem->jq_prev == NULL)
2132+
head->jq_next = newitem;
2133+
else
2134+
newitem->jq_prev->jq_next = newitem;
2135+
}
2136+
else
2137+
{
2138+
/* append after "item" */
2139+
newitem->jq_prev = item;
2140+
newitem->jq_next = item->jq_next;
2141+
item->jq_next = newitem;
2142+
if (newitem->jq_next == NULL)
2143+
head->jq_prev = newitem;
2144+
else
2145+
newitem->jq_next->jq_prev = newitem;
2146+
}
2147+
}
2148+
}
2149+
}
2150+
20832151
#define CH_JSON_MAX_ARGS 4
20842152

20852153
/*
@@ -2410,11 +2478,11 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
24102478
int argc = 0;
24112479

24122480
/* Get any json message in the queue. */
2413-
if (channel_get_json(channel, part, -1, &listtv) == FAIL)
2481+
if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
24142482
{
24152483
/* Parse readahead, return when there is still no message. */
24162484
channel_parse_json(channel, part);
2417-
if (channel_get_json(channel, part, -1, &listtv) == FAIL)
2485+
if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
24182486
return FALSE;
24192487
}
24202488

@@ -2454,7 +2522,7 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
24542522
{
24552523
/* If there is a close callback it may use ch_read() to get the
24562524
* messages. */
2457-
if (channel->ch_close_cb == NULL)
2525+
if (channel->ch_close_cb == NULL && !channel->ch_drop_never)
24582526
drop_messages(channel, part);
24592527
return FALSE;
24602528
}
@@ -2531,7 +2599,7 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
25312599
{
25322600
int done = FALSE;
25332601

2534-
/* invoke the one-time callback with the matching nr */
2602+
/* JSON or JS mode: invoke the one-time callback with the matching nr */
25352603
for (cbitem = cbhead->cq_next; cbitem != NULL; cbitem = cbitem->cq_next)
25362604
if (cbitem->cq_seq_nr == seq_nr)
25372605
{
@@ -2540,7 +2608,17 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
25402608
break;
25412609
}
25422610
if (!done)
2543-
ch_logn(channel, "Dropping message %d without callback", seq_nr);
2611+
{
2612+
if (channel->ch_drop_never)
2613+
{
2614+
/* message must be read with ch_read() */
2615+
channel_push_json(channel, part, listtv);
2616+
listtv = NULL;
2617+
}
2618+
else
2619+
ch_logn(channel, "Dropping message %d without callback",
2620+
seq_nr);
2621+
}
25442622
}
25452623
else if (callback != NULL || buffer != NULL)
25462624
{
@@ -2567,7 +2645,7 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
25672645
}
25682646
}
25692647
else
2570-
ch_log(channel, "Dropping message");
2648+
ch_logn(channel, "Dropping message %d", seq_nr);
25712649

25722650
if (listtv != NULL)
25732651
free_tv(listtv);
@@ -2792,9 +2870,10 @@ channel_close(channel_T *channel, int invoke_close_cb)
27922870
redraw_after_callback();
27932871
}
27942872

2795-
/* any remaining messages are useless now */
2796-
for (part = PART_SOCK; part < PART_IN; ++part)
2797-
drop_messages(channel, part);
2873+
if (!channel->ch_drop_never)
2874+
/* any remaining messages are useless now */
2875+
for (part = PART_SOCK; part < PART_IN; ++part)
2876+
drop_messages(channel, part);
27982877
}
27992878

28002879
channel->ch_nb_close_cb = NULL;
@@ -3091,9 +3170,9 @@ ch_close_part_on_error(
30913170
channel_close_now(channel_T *channel)
30923171
{
30933172
ch_log(channel, "Closing channel because all readable fds are closed");
3094-
channel_close(channel, TRUE);
30953173
if (channel->ch_nb_close_cb != NULL)
30963174
(*channel->ch_nb_close_cb)();
3175+
channel_close(channel, TRUE);
30973176
}
30983177

30993178
/*
@@ -3243,7 +3322,7 @@ channel_read_block(channel_T *channel, ch_part_T part, int timeout)
32433322
* When "id" is -1 accept any message;
32443323
* Blocks until the message is received or the timeout is reached.
32453324
*/
3246-
int
3325+
static int
32473326
channel_read_json_block(
32483327
channel_T *channel,
32493328
ch_part_T part,
@@ -3264,7 +3343,7 @@ channel_read_json_block(
32643343
more = channel_parse_json(channel, part);
32653344

32663345
/* search for message "id" */
3267-
if (channel_get_json(channel, part, id, rettv) == OK)
3346+
if (channel_get_json(channel, part, id, TRUE, rettv) == OK)
32683347
{
32693348
chanpart->ch_block_id = 0;
32703349
return OK;
@@ -4290,6 +4369,20 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported)
42904369
return FAIL;
42914370
}
42924371
}
4372+
else if (STRCMP(hi->hi_key, "drop") == 0)
4373+
{
4374+
int never = FALSE;
4375+
val = get_tv_string(item);
4376+
4377+
if (STRCMP(val, "never") == 0)
4378+
never = TRUE;
4379+
else if (STRCMP(val, "auto") != 0)
4380+
{
4381+
EMSG2(_(e_invarg2), "drop");
4382+
return FAIL;
4383+
}
4384+
opt->jo_drop_never = never;
4385+
}
42934386
else if (STRCMP(hi->hi_key, "exit_cb") == 0)
42944387
{
42954388
if (!(supported & JO_EXIT_CB))

src/evalfunc.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1786,7 +1786,7 @@ f_ceil(typval_T *argvars, typval_T *rettv)
17861786
static void
17871787
f_ch_canread(typval_T *argvars, typval_T *rettv)
17881788
{
1789-
channel_T *channel = get_channel_arg(&argvars[0], TRUE, TRUE, 0);
1789+
channel_T *channel = get_channel_arg(&argvars[0], FALSE, FALSE, 0);
17901790

17911791
rettv->vval.v_number = 0;
17921792
if (channel != NULL)

src/message.c

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ static int confirm_msg_used = FALSE; /* displaying confirm_msg */
4242
static char_u *confirm_msg = NULL; /* ":confirm" message */
4343
static char_u *confirm_msg_tail; /* tail of confirm_msg */
4444
#endif
45+
#ifdef FEAT_JOB_CHANNEL
46+
static int emsg_to_channel_log = FALSE;
47+
#endif
4548

4649
struct msg_hist
4750
{
@@ -166,6 +169,14 @@ msg_attr_keep(
166169
&& STRCMP(s, last_msg_hist->msg)))
167170
add_msg_hist(s, -1, attr);
168171

172+
#ifdef FEAT_JOB_CHANNEL
173+
if (emsg_to_channel_log)
174+
{
175+
/* Write message in the channel log. */
176+
ch_logs(NULL, "ERROR: %s", (char *)s);
177+
}
178+
#endif
179+
169180
/* When displaying keep_msg, don't let msg_start() free it, caller must do
170181
* that. */
171182
if (s == keep_msg)
@@ -556,6 +567,7 @@ emsg(char_u *s)
556567
{
557568
int attr;
558569
char_u *p;
570+
int r;
559571
#ifdef FEAT_EVAL
560572
int ignore = FALSE;
561573
int severe;
@@ -624,6 +636,9 @@ emsg(char_u *s)
624636
}
625637
redir_write(s, -1);
626638
}
639+
#ifdef FEAT_JOB_CHANNEL
640+
ch_logs(NULL, "ERROR: %s", (char *)s);
641+
#endif
627642
return TRUE;
628643
}
629644

@@ -650,6 +665,9 @@ emsg(char_u *s)
650665
* and a redraw is expected because
651666
* msg_scrolled is non-zero */
652667

668+
#ifdef FEAT_JOB_CHANNEL
669+
emsg_to_channel_log = TRUE;
670+
#endif
653671
/*
654672
* Display name and line number for the source of the error.
655673
*/
@@ -659,7 +677,12 @@ emsg(char_u *s)
659677
* Display the error message itself.
660678
*/
661679
msg_nowait = FALSE; /* wait for this msg */
662-
return msg_attr(s, attr);
680+
r = msg_attr(s, attr);
681+
682+
#ifdef FEAT_JOB_CHANNEL
683+
emsg_to_channel_log = FALSE;
684+
#endif
685+
return r;
663686
}
664687

665688

src/proto/channel.pro

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ void channel_close_in(channel_T *channel);
3333
void channel_clear(channel_T *channel);
3434
void channel_free_all(void);
3535
char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout);
36-
int channel_read_json_block(channel_T *channel, ch_part_T part, int timeout_arg, int id, typval_T **rettv);
3736
void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
3837
channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
3938
void channel_handle_events(void);

src/structs.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,6 +1474,7 @@ struct jsonq_S
14741474
typval_T *jq_value;
14751475
jsonq_T *jq_next;
14761476
jsonq_T *jq_prev;
1477+
int jq_no_callback; /* TRUE when no callback was found */
14771478
};
14781479

14791480
struct cbq_S
@@ -1597,6 +1598,7 @@ struct channel_S {
15971598
partial_T *ch_partial;
15981599
char_u *ch_close_cb; /* call when channel is closed */
15991600
partial_T *ch_close_partial;
1601+
int ch_drop_never;
16001602

16011603
job_T *ch_job; /* Job that uses this channel; this does not
16021604
* count as a reference to avoid a circular
@@ -1684,6 +1686,7 @@ typedef struct
16841686
partial_T *jo_close_partial; /* not referenced! */
16851687
char_u *jo_exit_cb; /* not allocated! */
16861688
partial_T *jo_exit_partial; /* not referenced! */
1689+
int jo_drop_never;
16871690
int jo_waittime;
16881691
int jo_timeout;
16891692
int jo_out_timeout;

src/version.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,8 @@ static char *(features[]) =
764764

765765
static int included_patches[] =
766766
{ /* Add new patch number below this line */
767+
/**/
768+
107,
767769
/**/
768770
106,
769771
/**/

0 commit comments

Comments
 (0)