Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v1.7.7-alpha0

* Added `TI_PROTO_CLIENT_REQ_EMIT_PEER` protocol, pr #414.

# v1.7.6

* Fixed maximum enumerators per collection, issue #412.
Expand Down
52 changes: 27 additions & 25 deletions inc/ti/proto.t.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,48 @@ typedef enum
/*
* 0x0000xxxx 0..15 fire and forgets from node to client
*/
TI_PROTO_CLIENT_NODE_STATUS =0, /* {id:x status:...} */
TI_PROTO_CLIENT_NODE_STATUS =0, /* {id:x status:...} */
/* 1..4 : Old watch protocol */
TI_PROTO_CLIENT_WARN =5, /* {warn_msg:..., warn_code: x} */
TI_PROTO_CLIENT_ROOM_JOIN =6, /* {id:x} */
TI_PROTO_CLIENT_ROOM_LEAVE =7, /* {id:x} */
TI_PROTO_CLIENT_ROOM_EMIT =8, /* {id:x, event:name, args:[...] } */
TI_PROTO_CLIENT_ROOM_DELETE =9, /* {id:x } */
TI_PROTO_CLIENT_WARN =5, /* {warn_msg:..., warn_code: x} */
TI_PROTO_CLIENT_ROOM_JOIN =6, /* {id:x} */
TI_PROTO_CLIENT_ROOM_LEAVE =7, /* {id:x} */
TI_PROTO_CLIENT_ROOM_EMIT =8, /* {id:x, event:name, args:[...] } */
TI_PROTO_CLIENT_ROOM_DELETE =9, /* {id:x } */

/*
* 0x0001xxxx 16..31 client responses
*/
TI_PROTO_CLIENT_RES_PONG =16, /* empty */
TI_PROTO_CLIENT_RES_OK =17, /* empty */
TI_PROTO_CLIENT_RES_DATA =18, /* ... */
TI_PROTO_CLIENT_RES_ERROR =19, /* {error_msg:..., error_code: x} */
TI_PROTO_CLIENT_RES_PONG =16, /* empty */
TI_PROTO_CLIENT_RES_OK =17, /* empty */
TI_PROTO_CLIENT_RES_DATA =18, /* ... */
TI_PROTO_CLIENT_RES_ERROR =19, /* {error_msg:..., error_code: x} */

/*
* 0x0010xxxx 32..63 client requests
*/
TI_PROTO_CLIENT_REQ_PING =32, /* empty */
TI_PROTO_CLIENT_REQ_AUTH =33, /* [user, pass] or token */
TI_PROTO_CLIENT_REQ_QUERY =34, /* [scope, code, {variable}] */
TI_PROTO_CLIENT_REQ_PING =32, /* empty */
TI_PROTO_CLIENT_REQ_AUTH =33, /* [user, pass] or token */
TI_PROTO_CLIENT_REQ_QUERY =34, /* [scope, code, {variable}] */
/* 35..36 : Old watch protocol */
_TI_PROTO_CLIENT_DEP_35 =35, /* TODO (COMPAT) */
_TI_PROTO_CLIENT_DEP_36 =36, /* TODO (COMPAT) */
_TI_PROTO_CLIENT_DEP_35 =35, /* TODO (COMPAT) */
_TI_PROTO_CLIENT_DEP_36 =36, /* TODO (COMPAT) */

TI_PROTO_CLIENT_REQ_RUN =37, /* [scope, procedure, [[args]/{kw}] */
TI_PROTO_CLIENT_REQ_JOIN =38, /* [scope, ...room id's]} */
TI_PROTO_CLIENT_REQ_LEAVE =39, /* [scope, ...room id's]} */
TI_PROTO_CLIENT_REQ_EMIT =40, /* [scope, room_id, event, ...args] */
TI_PROTO_CLIENT_REQ_EMIT_PEER =41, /* [scope, room_id, event, ...args] */

TI_PROTO_CLIENT_REQ_RUN =37, /* [scope, procedure, [[args]/{kw}] */
TI_PROTO_CLIENT_REQ_JOIN =38, /* [scope, ...room id's]} */
TI_PROTO_CLIENT_REQ_LEAVE =39, /* [scope, ...room id's]} */
TI_PROTO_CLIENT_REQ_EMIT =40, /* [scope, room_id, event, ...args] */

/*
* 64..127 modules range
*/
TI_PROTO_MODULE_CONF =64, /* data, initialize extension */
TI_PROTO_MODULE_CONF_OK =65, /* empty */
TI_PROTO_MODULE_CONF_ERR =66, /* empty */
TI_PROTO_MODULE_REQ =80, /* data, request */
TI_PROTO_MODULE_RES =81, /* data, response */
TI_PROTO_MODULE_ERR =82, /* [err_nr, message] */
TI_PROTO_MODULE_CONF =64, /* data, initialize extension */
TI_PROTO_MODULE_CONF_OK =65, /* empty */
TI_PROTO_MODULE_CONF_ERR =66, /* empty */
TI_PROTO_MODULE_REQ =80, /* data, request */
TI_PROTO_MODULE_RES =81, /* data, response */
TI_PROTO_MODULE_ERR =82, /* [err_nr, message] */

/*
* protocol definition for node connections
Expand Down
11 changes: 10 additions & 1 deletion inc/ti/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ void ti_room_emit_node_status(ti_room_t * room, const char * status);
int ti_room_copy(ti_room_t ** room);
int ti_room_emit(
ti_room_t * room,
ti_stream_t * stream,
ti_query_t * query,
vec_t * args,
const char * event,
Expand All @@ -43,6 +44,7 @@ int ti_room_emit(
int ti_room_emit_from_pkg(
ti_collection_t * collection,
ti_pkg_t * pkg,
ti_stream_t * stream, /* null for echo */
ex_t * e);

static inline int ti_room_emit_raw(
Expand All @@ -54,7 +56,14 @@ static inline int ti_room_emit_raw(
int flags)
{
return ti_room_emit(
room, query, args, (const char *) event->data, event->n, deep, flags);
room,
NULL,
query,
args,
(const char *) event->data,
event->n,
deep,
flags);
}

#endif /* TI_ROOM_H_ */
4 changes: 2 additions & 2 deletions inc/ti/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#define TI_VERSION_MAJOR 1
#define TI_VERSION_MINOR 7
#define TI_VERSION_PATCH 6
#define TI_VERSION_PATCH 7

/* The syntax version is used to test compatibility with functions
* using the `ti_nodes_check_syntax()` function */
Expand All @@ -25,7 +25,7 @@
* "-rc0"
* ""
*/
#define TI_VERSION_PRE_RELEASE ""
#define TI_VERSION_PRE_RELEASE "-alpha0"

#define TI_MAINTAINER \
"Jeroen van der Heijden <[email protected]>"
Expand Down
2 changes: 1 addition & 1 deletion itest/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
psutil
pycodestyle
msgpack
python-thingsdb>=1.2.2
python-thingsdb>=1.3.0
requests
pytz
websockets
33 changes: 33 additions & 0 deletions itest/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,39 @@ async def test_room_name(self, cl0, cl1, cl2):
res = await cl0.query('room("A").name();')
self.assertEqual(res, "A")

async def test_room_peer_only(self, cl0, cl1, cl2):
await cl0.query(r"""//ti
.room = room();
.room.set_name("test_peer_room");
""")
actions0 = []
actions1 = []
room0 = ORoom(actions0, 'test_peer_room')
room1 = ORoom(actions1, 'test_peer_room')

await room0.join(cl0)
await room1.join(cl1)

await room0.emit('add', 'from_0_to_all')
await room1.emit('add', 'from_1_to_all')

await room0.emit('add', 'from_0_to_peers', peers_only=True)
await room1.emit('add', 'from_1_to_peers', peers_only=True)

await asyncio.sleep(0.5)

self.assertEqual(sorted([
'from_0_to_all',
'from_1_to_all',
'from_1_to_peers',
]), sorted(actions0))

self.assertEqual(sorted([
'from_0_to_all',
'from_1_to_all',
'from_0_to_peers',
]), sorted(actions1))


if __name__ == '__main__':
run_test(TestRoom())
9 changes: 0 additions & 9 deletions src/ex.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ void ex_setv(ex_t * e, ex_enum errnr, const char * errmsg, va_list args)
e->nr = errnr;
n = vsnprintf(e->msg, EX_MAX_SZ, errmsg, args);
e->n = n < EX_MAX_SZ ? n : EX_MAX_SZ;

/* TODO: Check below may be removed when we are sure no wrong
* formatting in the code exists */
if (e->n < 0)
{
e->n = 0;
e->msg[0] = '\0';
assert(0);
}
}

void ex_set(ex_t * e, ex_enum errnr, const char * errmsg, ...)
Expand Down
11 changes: 8 additions & 3 deletions src/ti/clients.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,9 @@ static void clients__on_leave(ti_stream_t * stream, ti_pkg_t * pkg)
}
}

static void clients__on_emit(ti_stream_t * stream, ti_pkg_t * pkg)
static void clients__on_emit(ti_stream_t * stream,
ti_stream_t * peer, /* null for echo */
ti_pkg_t * pkg)
{
ex_t e = {0};
ti_user_t * user = stream->via.user;
Expand All @@ -446,7 +448,7 @@ static void clients__on_emit(ti_stream_t * stream, ti_pkg_t * pkg)

if (!(collection = ti_scope_get_collection(&scope, &e)) ||
ti_access_check_err(collection->access, user, TI_AUTH_JOIN, &e) ||
ti_room_emit_from_pkg(collection, pkg, &e))
ti_room_emit_from_pkg(collection, pkg, peer, &e))
goto on_error;

resp = ti_pkg_new(pkg->id, TI_PROTO_CLIENT_RES_OK, NULL, 0);
Expand Down Expand Up @@ -575,7 +577,10 @@ void ti_clients_pkg_cb(ti_stream_t * stream, ti_pkg_t * pkg)
clients__on_leave(stream, pkg);
break;
case TI_PROTO_CLIENT_REQ_EMIT:
clients__on_emit(stream, pkg);
clients__on_emit(stream, NULL, pkg);
break;
case TI_PROTO_CLIENT_REQ_EMIT_PEER:
clients__on_emit(stream, stream, pkg);
break;
case _TI_PROTO_CLIENT_DEP_35: /* deprecated watch request */
case _TI_PROTO_CLIENT_DEP_36: /* deprecated watch request */
Expand Down
1 change: 1 addition & 0 deletions src/ti/proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const char * ti_proto_str(ti_proto_enum_t tp)
case TI_PROTO_CLIENT_REQ_JOIN: return "CLIENT_REQ_JOIN";
case TI_PROTO_CLIENT_REQ_LEAVE: return "CLIENT_REQ_LEAVE";
case TI_PROTO_CLIENT_REQ_EMIT: return "CLIENT_REQ_EMIT";
case TI_PROTO_CLIENT_REQ_EMIT_PEER: return "CLIENT_REQ_EMIT_PEER";

case TI_PROTO_MODULE_CONF: return "MODULE_CONF";
case TI_PROTO_MODULE_CONF_OK: return "MODULE_CONF_OK";
Expand Down
13 changes: 9 additions & 4 deletions src/ti/room.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ int ti_room_set_name(ti_room_t * room, ti_name_t * name)
return 0;
}

static void room__write_rpkg(ti_room_t * room, ti_rpkg_t * rpkg)
static void room__write_rpkg(ti_room_t * room,
ti_stream_t * stream,
ti_rpkg_t * rpkg)
{
for (vec_each(room->listeners, ti_watch_t, watch))
{
if (ti_stream_is_closed(watch->stream))
if (watch->stream == stream || ti_stream_is_closed(watch->stream))
continue;

if (ti_stream_write_rpkg(watch->stream, rpkg))
Expand All @@ -97,7 +99,7 @@ static void room__write_pkg(ti_room_t * room, ti_pkg_t * pkg)
log_critical(EX_MEMORY_S);
return;
}
room__write_rpkg(room, rpkg);
room__write_rpkg(room, NULL, rpkg);
ti_rpkg_drop(rpkg);
}

Expand Down Expand Up @@ -159,6 +161,7 @@ void ti_room_emit_data(ti_room_t * room, const void * data, size_t sz)

int ti_room_emit(
ti_room_t * room,
ti_stream_t * stream,
ti_query_t * query,
vec_t * args,
const char * event,
Expand Down Expand Up @@ -219,7 +222,7 @@ int ti_room_emit(
ti_nodes_write_rpkg(node_rpkg);
ti_rpkg_drop(node_rpkg);

room__write_rpkg(room, client_rpkg);
room__write_rpkg(room, stream, client_rpkg);
ti_rpkg_drop(client_rpkg);

return 0;
Expand All @@ -238,6 +241,7 @@ int ti_room_emit(
int ti_room_emit_from_pkg(
ti_collection_t * collection,
ti_pkg_t * pkg,
ti_stream_t * stream,
ex_t * e)
{
size_t nargs;
Expand Down Expand Up @@ -349,6 +353,7 @@ int ti_room_emit_from_pkg(

if (ti_room_emit(
room,
stream,
NULL,
args,
mp_event.via.str.data,
Expand Down