diff --git a/CHANGELOG.md b/CHANGELOG.md index d0bbdc69..603d8090 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# v1.7.7-alpha0 + +* Added `TI_PROTO_CLIENT_REQ_EMIT_PEER` protocol, pr #414. + # v1.7.6 * Fixed maximum enumerators per collection, issue #412. diff --git a/inc/ti/proto.t.h b/inc/ti/proto.t.h index 28cb796d..2df093cb 100644 --- a/inc/ti/proto.t.h +++ b/inc/ti/proto.t.h @@ -15,46 +15,48 @@ typedef enum /* * 0x0000xxxx 0..15 fire and forgets from node to client */ - TI_PROTO_CLIENT_NODE_STATUS =0, /* {id:x status:...} */ + TI_PROTO_CLIENT_NODE_STATUS =0, /* {id:x status:...} */ /* 1..4 : Old watch protocol */ - TI_PROTO_CLIENT_WARN =5, /* {warn_msg:..., warn_code: x} */ - TI_PROTO_CLIENT_ROOM_JOIN =6, /* {id:x} */ - TI_PROTO_CLIENT_ROOM_LEAVE =7, /* {id:x} */ - TI_PROTO_CLIENT_ROOM_EMIT =8, /* {id:x, event:name, args:[...] } */ - TI_PROTO_CLIENT_ROOM_DELETE =9, /* {id:x } */ + TI_PROTO_CLIENT_WARN =5, /* {warn_msg:..., warn_code: x} */ + TI_PROTO_CLIENT_ROOM_JOIN =6, /* {id:x} */ + TI_PROTO_CLIENT_ROOM_LEAVE =7, /* {id:x} */ + TI_PROTO_CLIENT_ROOM_EMIT =8, /* {id:x, event:name, args:[...] } */ + TI_PROTO_CLIENT_ROOM_DELETE =9, /* {id:x } */ /* * 0x0001xxxx 16..31 client responses */ - TI_PROTO_CLIENT_RES_PONG =16, /* empty */ - TI_PROTO_CLIENT_RES_OK =17, /* empty */ - TI_PROTO_CLIENT_RES_DATA =18, /* ... */ - TI_PROTO_CLIENT_RES_ERROR =19, /* {error_msg:..., error_code: x} */ + TI_PROTO_CLIENT_RES_PONG =16, /* empty */ + TI_PROTO_CLIENT_RES_OK =17, /* empty */ + TI_PROTO_CLIENT_RES_DATA =18, /* ... */ + TI_PROTO_CLIENT_RES_ERROR =19, /* {error_msg:..., error_code: x} */ /* * 0x0010xxxx 32..63 client requests */ - TI_PROTO_CLIENT_REQ_PING =32, /* empty */ - TI_PROTO_CLIENT_REQ_AUTH =33, /* [user, pass] or token */ - TI_PROTO_CLIENT_REQ_QUERY =34, /* [scope, code, {variable}] */ + TI_PROTO_CLIENT_REQ_PING =32, /* empty */ + TI_PROTO_CLIENT_REQ_AUTH =33, /* [user, pass] or token */ + TI_PROTO_CLIENT_REQ_QUERY =34, /* [scope, code, {variable}] */ /* 35..36 : Old watch protocol */ - _TI_PROTO_CLIENT_DEP_35 =35, /* TODO (COMPAT) */ - _TI_PROTO_CLIENT_DEP_36 =36, /* TODO (COMPAT) */ + _TI_PROTO_CLIENT_DEP_35 =35, /* TODO (COMPAT) */ + _TI_PROTO_CLIENT_DEP_36 =36, /* TODO (COMPAT) */ + + TI_PROTO_CLIENT_REQ_RUN =37, /* [scope, procedure, [[args]/{kw}] */ + TI_PROTO_CLIENT_REQ_JOIN =38, /* [scope, ...room id's]} */ + TI_PROTO_CLIENT_REQ_LEAVE =39, /* [scope, ...room id's]} */ + TI_PROTO_CLIENT_REQ_EMIT =40, /* [scope, room_id, event, ...args] */ + TI_PROTO_CLIENT_REQ_EMIT_PEER =41, /* [scope, room_id, event, ...args] */ - TI_PROTO_CLIENT_REQ_RUN =37, /* [scope, procedure, [[args]/{kw}] */ - TI_PROTO_CLIENT_REQ_JOIN =38, /* [scope, ...room id's]} */ - TI_PROTO_CLIENT_REQ_LEAVE =39, /* [scope, ...room id's]} */ - TI_PROTO_CLIENT_REQ_EMIT =40, /* [scope, room_id, event, ...args] */ /* * 64..127 modules range */ - TI_PROTO_MODULE_CONF =64, /* data, initialize extension */ - TI_PROTO_MODULE_CONF_OK =65, /* empty */ - TI_PROTO_MODULE_CONF_ERR =66, /* empty */ - TI_PROTO_MODULE_REQ =80, /* data, request */ - TI_PROTO_MODULE_RES =81, /* data, response */ - TI_PROTO_MODULE_ERR =82, /* [err_nr, message] */ + TI_PROTO_MODULE_CONF =64, /* data, initialize extension */ + TI_PROTO_MODULE_CONF_OK =65, /* empty */ + TI_PROTO_MODULE_CONF_ERR =66, /* empty */ + TI_PROTO_MODULE_REQ =80, /* data, request */ + TI_PROTO_MODULE_RES =81, /* data, response */ + TI_PROTO_MODULE_ERR =82, /* [err_nr, message] */ /* * protocol definition for node connections diff --git a/inc/ti/room.h b/inc/ti/room.h index b41c4459..3484f7c5 100644 --- a/inc/ti/room.h +++ b/inc/ti/room.h @@ -34,6 +34,7 @@ void ti_room_emit_node_status(ti_room_t * room, const char * status); int ti_room_copy(ti_room_t ** room); int ti_room_emit( ti_room_t * room, + ti_stream_t * stream, ti_query_t * query, vec_t * args, const char * event, @@ -43,6 +44,7 @@ int ti_room_emit( int ti_room_emit_from_pkg( ti_collection_t * collection, ti_pkg_t * pkg, + ti_stream_t * stream, /* null for echo */ ex_t * e); static inline int ti_room_emit_raw( @@ -54,7 +56,14 @@ static inline int ti_room_emit_raw( int flags) { return ti_room_emit( - room, query, args, (const char *) event->data, event->n, deep, flags); + room, + NULL, + query, + args, + (const char *) event->data, + event->n, + deep, + flags); } #endif /* TI_ROOM_H_ */ diff --git a/inc/ti/version.h b/inc/ti/version.h index dc4de8c8..055af1e7 100644 --- a/inc/ti/version.h +++ b/inc/ti/version.h @@ -6,7 +6,7 @@ #define TI_VERSION_MAJOR 1 #define TI_VERSION_MINOR 7 -#define TI_VERSION_PATCH 6 +#define TI_VERSION_PATCH 7 /* The syntax version is used to test compatibility with functions * using the `ti_nodes_check_syntax()` function */ @@ -25,7 +25,7 @@ * "-rc0" * "" */ -#define TI_VERSION_PRE_RELEASE "" +#define TI_VERSION_PRE_RELEASE "-alpha0" #define TI_MAINTAINER \ "Jeroen van der Heijden " diff --git a/itest/requirements.txt b/itest/requirements.txt index 439987ea..74182d78 100644 --- a/itest/requirements.txt +++ b/itest/requirements.txt @@ -1,7 +1,7 @@ psutil pycodestyle msgpack -python-thingsdb>=1.2.2 +python-thingsdb>=1.3.0 requests pytz websockets diff --git a/itest/test_room.py b/itest/test_room.py index a999bd7a..27a40e87 100755 --- a/itest/test_room.py +++ b/itest/test_room.py @@ -303,6 +303,39 @@ async def test_room_name(self, cl0, cl1, cl2): res = await cl0.query('room("A").name();') self.assertEqual(res, "A") + async def test_room_peer_only(self, cl0, cl1, cl2): + await cl0.query(r"""//ti + .room = room(); + .room.set_name("test_peer_room"); + """) + actions0 = [] + actions1 = [] + room0 = ORoom(actions0, 'test_peer_room') + room1 = ORoom(actions1, 'test_peer_room') + + await room0.join(cl0) + await room1.join(cl1) + + await room0.emit('add', 'from_0_to_all') + await room1.emit('add', 'from_1_to_all') + + await room0.emit('add', 'from_0_to_peers', peers_only=True) + await room1.emit('add', 'from_1_to_peers', peers_only=True) + + await asyncio.sleep(0.5) + + self.assertEqual(sorted([ + 'from_0_to_all', + 'from_1_to_all', + 'from_1_to_peers', + ]), sorted(actions0)) + + self.assertEqual(sorted([ + 'from_0_to_all', + 'from_1_to_all', + 'from_0_to_peers', + ]), sorted(actions1)) + if __name__ == '__main__': run_test(TestRoom()) diff --git a/src/ex.c b/src/ex.c index 7deda315..80e161b7 100644 --- a/src/ex.c +++ b/src/ex.c @@ -16,15 +16,6 @@ void ex_setv(ex_t * e, ex_enum errnr, const char * errmsg, va_list args) e->nr = errnr; n = vsnprintf(e->msg, EX_MAX_SZ, errmsg, args); e->n = n < EX_MAX_SZ ? n : EX_MAX_SZ; - - /* TODO: Check below may be removed when we are sure no wrong - * formatting in the code exists */ - if (e->n < 0) - { - e->n = 0; - e->msg[0] = '\0'; - assert(0); - } } void ex_set(ex_t * e, ex_enum errnr, const char * errmsg, ...) diff --git a/src/ti/clients.c b/src/ti/clients.c index 9c2157c1..651de911 100644 --- a/src/ti/clients.c +++ b/src/ti/clients.c @@ -425,7 +425,9 @@ static void clients__on_leave(ti_stream_t * stream, ti_pkg_t * pkg) } } -static void clients__on_emit(ti_stream_t * stream, ti_pkg_t * pkg) +static void clients__on_emit(ti_stream_t * stream, + ti_stream_t * peer, /* null for echo */ + ti_pkg_t * pkg) { ex_t e = {0}; ti_user_t * user = stream->via.user; @@ -446,7 +448,7 @@ static void clients__on_emit(ti_stream_t * stream, ti_pkg_t * pkg) if (!(collection = ti_scope_get_collection(&scope, &e)) || ti_access_check_err(collection->access, user, TI_AUTH_JOIN, &e) || - ti_room_emit_from_pkg(collection, pkg, &e)) + ti_room_emit_from_pkg(collection, pkg, peer, &e)) goto on_error; resp = ti_pkg_new(pkg->id, TI_PROTO_CLIENT_RES_OK, NULL, 0); @@ -575,7 +577,10 @@ void ti_clients_pkg_cb(ti_stream_t * stream, ti_pkg_t * pkg) clients__on_leave(stream, pkg); break; case TI_PROTO_CLIENT_REQ_EMIT: - clients__on_emit(stream, pkg); + clients__on_emit(stream, NULL, pkg); + break; + case TI_PROTO_CLIENT_REQ_EMIT_PEER: + clients__on_emit(stream, stream, pkg); break; case _TI_PROTO_CLIENT_DEP_35: /* deprecated watch request */ case _TI_PROTO_CLIENT_DEP_36: /* deprecated watch request */ diff --git a/src/ti/proto.c b/src/ti/proto.c index adeeb1f2..4d5b557d 100644 --- a/src/ti/proto.c +++ b/src/ti/proto.c @@ -29,6 +29,7 @@ const char * ti_proto_str(ti_proto_enum_t tp) case TI_PROTO_CLIENT_REQ_JOIN: return "CLIENT_REQ_JOIN"; case TI_PROTO_CLIENT_REQ_LEAVE: return "CLIENT_REQ_LEAVE"; case TI_PROTO_CLIENT_REQ_EMIT: return "CLIENT_REQ_EMIT"; + case TI_PROTO_CLIENT_REQ_EMIT_PEER: return "CLIENT_REQ_EMIT_PEER"; case TI_PROTO_MODULE_CONF: return "MODULE_CONF"; case TI_PROTO_MODULE_CONF_OK: return "MODULE_CONF_OK"; diff --git a/src/ti/room.c b/src/ti/room.c index eddca6b7..ce8b7832 100644 --- a/src/ti/room.c +++ b/src/ti/room.c @@ -72,11 +72,13 @@ int ti_room_set_name(ti_room_t * room, ti_name_t * name) return 0; } -static void room__write_rpkg(ti_room_t * room, ti_rpkg_t * rpkg) +static void room__write_rpkg(ti_room_t * room, + ti_stream_t * stream, + ti_rpkg_t * rpkg) { for (vec_each(room->listeners, ti_watch_t, watch)) { - if (ti_stream_is_closed(watch->stream)) + if (watch->stream == stream || ti_stream_is_closed(watch->stream)) continue; if (ti_stream_write_rpkg(watch->stream, rpkg)) @@ -97,7 +99,7 @@ static void room__write_pkg(ti_room_t * room, ti_pkg_t * pkg) log_critical(EX_MEMORY_S); return; } - room__write_rpkg(room, rpkg); + room__write_rpkg(room, NULL, rpkg); ti_rpkg_drop(rpkg); } @@ -159,6 +161,7 @@ void ti_room_emit_data(ti_room_t * room, const void * data, size_t sz) int ti_room_emit( ti_room_t * room, + ti_stream_t * stream, ti_query_t * query, vec_t * args, const char * event, @@ -219,7 +222,7 @@ int ti_room_emit( ti_nodes_write_rpkg(node_rpkg); ti_rpkg_drop(node_rpkg); - room__write_rpkg(room, client_rpkg); + room__write_rpkg(room, stream, client_rpkg); ti_rpkg_drop(client_rpkg); return 0; @@ -238,6 +241,7 @@ int ti_room_emit( int ti_room_emit_from_pkg( ti_collection_t * collection, ti_pkg_t * pkg, + ti_stream_t * stream, ex_t * e) { size_t nargs; @@ -349,6 +353,7 @@ int ti_room_emit_from_pkg( if (ti_room_emit( room, + stream, NULL, args, mp_event.via.str.data,