Skip to content

Commit 21f76f6

Browse files
committed
Add mmsclient example script
--- Examples: + Add simplistic hexdump approach + Add MMSClient example script
1 parent 5054225 commit 21f76f6

4 files changed

Lines changed: 449 additions & 1 deletion

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,4 @@ write_to = "src/icspacket/_version.py"
7171
# --- example scripts
7272
[project.scripts]
7373
"mms_utility.py" = "icspacket.examples.mms_utility:cli_main"
74+
"mmsclient.py" = "icspacket.examples.mmsclient:cli_main"
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
# This file is part of icspacket.
2+
# Copyright (C) 2025-present MatrixEditor @ github
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
# pyright: reportUnusedCallResult=false, reportGeneralTypeIssues=false
17+
import logging
18+
import sys
19+
import pathlib
20+
import cmd2
21+
22+
from typing import Any
23+
from rich.console import Console
24+
from typing_extensions import override
25+
26+
from rich import box
27+
from rich.table import Table
28+
29+
from icspacket.proto.cotp.structs import TPDU_Size
30+
from icspacket.proto.mms._mms import FileName, ServiceError
31+
from icspacket.proto.mms.connection import MMS_Connection
32+
from icspacket.proto.mms.exceptions import MMSConnectionError, MMSUnknownServiceError
33+
34+
from icspacket.examples.util.mms import init_mms_connection
35+
from icspacket.examples.util import hexdump
36+
37+
38+
class MMSClient(cmd2.Cmd):
39+
do_exit = cmd2.Cmd.do_quit
40+
41+
def __init__(self, conn: MMS_Connection) -> None:
42+
"""Interactive MMS client shell.
43+
44+
This command-line shell allows interactive navigation and manipulation
45+
of remote MMS (Manufacturing Message Specification) file systems, as
46+
well as execution of basic MMS services. It is designed to mimic the
47+
behavior of familiar UNIX shell commands while operating against
48+
MMS-accessible resources.
49+
50+
:param conn: Active MMS connection instance
51+
:type conn: MMS_Connection
52+
"""
53+
super().__init__(
54+
allow_cli_args=False,
55+
allow_redirection=False,
56+
)
57+
self.__connection = conn
58+
self.local_dir = pathlib.Path(".").absolute()
59+
self.remote_dir = pathlib.Path("/")
60+
self.prompt = "mms> "
61+
# disable default commands
62+
del [
63+
cmd2.Cmd.do_alias,
64+
cmd2.Cmd.do_edit,
65+
cmd2.Cmd.do_macro,
66+
cmd2.Cmd.do_py,
67+
cmd2.Cmd.do_run_pyscript,
68+
cmd2.Cmd.do_run_script,
69+
cmd2.Cmd.do_set,
70+
cmd2.Cmd.do_shell,
71+
cmd2.Cmd.do_shortcuts,
72+
]
73+
self.remove_settable("debug")
74+
75+
@property
76+
def conn(self) -> MMS_Connection:
77+
"""Return the underlying MMS connection object."""
78+
return self.__connection
79+
80+
def do_id(self, _) -> None:
81+
"""id
82+
83+
Display the remote MMS server identity.
84+
85+
This command sends an MMS Identify request and prints the
86+
response, including vendor, model, and revision information.
87+
"""
88+
name, model, revision = self.conn.identify()
89+
self.poutput(f"Vendor: {name}")
90+
self.poutput(f"Model: {model}")
91+
self.poutput(f"Revision: {revision}")
92+
93+
def do_ldir(self, arg) -> None:
94+
"""ldir [newdir]
95+
96+
List the currently used *local* directory or specify a new one
97+
"""
98+
if arg:
99+
self.local_dir = pathlib.Path(arg).absolute().resolve()
100+
else:
101+
self.poutput(str(self.local_dir))
102+
103+
def do_rdir(self, arg) -> None:
104+
"""rdir [newdir]
105+
106+
List the currently used *remote* directory or specify a new one
107+
"""
108+
if arg:
109+
if arg.startswith("/"):
110+
self.remote_dir = pathlib.Path(arg).resolve()
111+
else:
112+
self.remote_dir = pathlib.Path(arg).absolute().resolve()
113+
else:
114+
self.poutput(str(self.remote_dir))
115+
116+
def do_cd(self, arg) -> None:
117+
"""cd newdir
118+
119+
Change the current remote MMS working directory.
120+
121+
This affects subsequent relative path operations, such as
122+
ls, get, rename, and del.
123+
"""
124+
self.remote_dir = (self.remote_dir / arg).absolute().resolve()
125+
logging.info("Changed remote directory to '%s'", self.remote_dir)
126+
127+
def _handle_file_service_error(
128+
self, exc: Exception, service_error: ServiceError, msg: str
129+
):
130+
if (
131+
service_error.errorClass.present
132+
!= ServiceError.errorClass_TYPE.PRESENT.PR_file
133+
):
134+
logging.exception("Could not open file", exc)
135+
else:
136+
code = service_error.errorClass.file
137+
logging.error(
138+
"%s: [bold red]%s[/] [red](%s)[/]",
139+
msg,
140+
code.name,
141+
code,
142+
)
143+
144+
ls_parser = cmd2.Cmd2ArgumentParser()
145+
ls_parser.add_argument(
146+
"directory",
147+
type=str,
148+
nargs="?",
149+
help="Remote directory to list (defaults to current remote directory)",
150+
)
151+
152+
@cmd2.with_argparser(ls_parser)
153+
def do_ls(self, args) -> None:
154+
"""List the contents of a remote MMS directory."""
155+
directory = args.directory
156+
remote_dir = self.remote_dir
157+
if directory:
158+
remote_dir /= directory
159+
160+
name = FileName([str(remote_dir)])
161+
try:
162+
if str(remote_dir) == "/":
163+
# show root dir
164+
name = None
165+
166+
entries = self.conn.list_directory(name)
167+
168+
except MMSConnectionError as error:
169+
service_error = error.error
170+
self._handle_file_service_error(
171+
error, service_error, f"Could not list directory {str(remote_dir)!r}"
172+
)
173+
else:
174+
console = Console()
175+
table = Table(
176+
title=f"Information of {remote_dir}",
177+
safe_box=True,
178+
expand=True,
179+
box=box.ASCII_DOUBLE_HEAD,
180+
)
181+
table.add_column("Name", justify="left", style="bold")
182+
table.add_column("Size", justify="left")
183+
table.add_column("Last Modified", justify="right")
184+
for entry in entries:
185+
raw_path = "".join(list(entry.fileName))
186+
path = pathlib.Path(raw_path.removeprefix(str(remote_dir) + "/"))
187+
table.add_row(str(path), str(entry.fileAttributes.sizeOfFile), "N/A")
188+
console.print(table)
189+
190+
# fmt: off
191+
get_parser = cmd2.Cmd2ArgumentParser(add_help=True)
192+
get_parser.add_argument("remote_name", type=str, help="Remote file name")
193+
get_parser.add_argument("local_name", type=str, nargs="?", help="Optional local file name to store the file under; defaults to the remote file name")
194+
get_parser.add_argument("--stdout", action="store_true", help="Write file contents to stdout (hex) instead of saving locally")
195+
get_parser.add_argument("--unsafe", action="store_true", help="Write file contents as text to stdout",)
196+
# fmt: on
197+
198+
@cmd2.with_argparser(get_parser)
199+
def do_get(self, args) -> None:
200+
"""Retrieve a file from the remote MMS server.
201+
202+
Supports saving to a local file or streaming to standard output.
203+
"""
204+
console = Console()
205+
remote_file_path = self.remote_dir / args.remote_name
206+
remote_file_name = FileName([str(remote_file_path)])
207+
if not args.local_name:
208+
local_file_path = self.local_dir / remote_file_path.name
209+
else:
210+
local_file_path = pathlib.Path(args.local_name)
211+
212+
logging.debug("Saving to '%s'", local_file_path)
213+
try:
214+
logging.debug("Opening '%s'", remote_file_path)
215+
handle = self.conn.file_open(remote_file_name)
216+
logging.info(
217+
"Downloading %s (%d bytes)...",
218+
remote_file_path.name,
219+
handle.attributes.sizeOfFile.value,
220+
)
221+
with console.status("Retrieving file..."):
222+
data = self.conn.file_read(handle)
223+
self.conn.file_close(handle)
224+
225+
if args.stdout:
226+
if args.unsafe:
227+
self.poutput(data.decode(errors="replace"))
228+
else:
229+
self.poutput(hexdump.hexdump(data))
230+
else:
231+
with local_file_path.open("wb") as local_fp:
232+
local_fp.write(data)
233+
234+
except MMSConnectionError as error:
235+
service_error = error.error
236+
self._handle_file_service_error(
237+
error, service_error, f"Could not get file {str(remote_file_path)!r}"
238+
)
239+
240+
rename_parser = cmd2.Cmd2ArgumentParser()
241+
rename_parser.add_argument("old_name", type=str, help="Remote old file name")
242+
rename_parser.add_argument("new_name", type=str, help="Remote new file name")
243+
244+
@cmd2.with_argparser(rename_parser)
245+
def do_rename(self, args) -> None:
246+
"""Rename a file in the remote MMS server."""
247+
remote_file_path = self.remote_dir / args.old_name
248+
remote_file_name = FileName([str(remote_file_path)])
249+
new_remote_file_path = self.remote_dir / args.new_name
250+
new_remote_file_name = FileName([str(new_remote_file_path)])
251+
logging.info("Renaming '%s' to '%s'", remote_file_path, new_remote_file_path)
252+
try:
253+
self.conn.file_rename(remote_file_name, new_remote_file_name)
254+
except MMSConnectionError as error:
255+
service_error = error.error
256+
self._handle_file_service_error(
257+
error, service_error, f"Could not rename file {str(remote_file_path)!r}"
258+
)
259+
260+
del_parser = cmd2.Cmd2ArgumentParser()
261+
del_parser.add_argument("remote_name", type=str, help="Remote file name to delete")
262+
263+
@cmd2.with_argparser(del_parser)
264+
def do_del(self, args) -> None:
265+
"""Delete a file from the remote MMS server."""
266+
remote_file_path = self.remote_dir / args.remote_name
267+
remote_file_name = FileName([str(remote_file_path)])
268+
logging.info("Deleting '%s'", remote_file_path)
269+
try:
270+
self.conn.file_delete(remote_file_name)
271+
except MMSConnectionError as error:
272+
service_error = error.error
273+
self._handle_file_service_error(
274+
error, service_error, f"Could not delete file {str(remote_file_path)!r}"
275+
)
276+
277+
@override
278+
def pexcept(self, msg: Any, *, end: str = "\n", apply_style: bool = True) -> None:
279+
if isinstance(msg, MMSUnknownServiceError):
280+
return logging.error(str(msg))
281+
282+
return super().pexcept(msg, end=end, apply_style=apply_style)
283+
284+
285+
def cli_main():
286+
import argparse
287+
288+
from rich.console import Console
289+
from icspacket import __version__
290+
from icspacket.core import logger
291+
292+
parser = argparse.ArgumentParser(
293+
usage="%(prog)s [options] host [command [args...]]",
294+
)
295+
296+
# fmt: off
297+
# ------------------------------------------------------------------------
298+
# Authentication options
299+
# ------------------------------------------------------------------------
300+
auth_group = parser.add_argument_group("Authentication Options", "ACSE/Password authentication for MMS association")
301+
auth_group.add_argument("--auth", type=str, metavar="<qualifier>@<title>:<password>", help="Password-based authentication specification, e.g., '100@operator:secret'", default=None)
302+
auth_group.add_argument("--auth-stdin", action="store_true", help="Read authentication specification from stdin (interactive prompt)", default=False)
303+
304+
# ------------------------------------------------------------------------
305+
# Connection options
306+
# ------------------------------------------------------------------------
307+
conn_group = parser.add_argument_group("Connection Options","Specify transport layer settings and target host information")
308+
conn_group.add_argument("-p", "--port", type=int, help="TCP port of the target MMS server (default: 102)", default=102)
309+
conn_group.add_argument("--max-tpdu-size", type=int, metavar="SIZE", help="Maximum TPDU size to negotiate during COTP connection", default=TPDU_Size.SIZE_1024)
310+
conn_group.add_argument("--timeout", type=float, metavar="SEC", help="Timeout in seconds for transport-level operations (default: None)", default=None)
311+
conn_group.add_argument("host", type=str, help="Target host (IP address or hostname) to establish MMS connection")
312+
313+
# ------------------------------------------------------------------------
314+
# Logging options
315+
# ------------------------------------------------------------------------
316+
log_group = parser.add_argument_group("Logging Options", "Control verbosity and formatting of log messages")
317+
log_group.add_argument("-v", action="count", help="Increase logging verbosity (can be specified multiple times)", dest="verbosity", default=0)
318+
log_group.add_argument("-q", "--quiet", action="store_true", help="Suppress informational logs (errors still printed)", default=False)
319+
log_group.add_argument("--ts", action="store_true", help="Add timestamps to log messages", default=False)
320+
# fmt: on
321+
322+
args, remaining = parser.parse_known_args()
323+
args.console = Console()
324+
325+
logger.init_from_args(args.verbosity, args.quiet, args.ts)
326+
if args.verbosity > 0:
327+
args.console.print(f"icspacket v{__version__}\n")
328+
329+
conn = init_mms_connection(
330+
args.host,
331+
args.port,
332+
args.auth,
333+
args.auth_stdin,
334+
args.timeout,
335+
args.max_tpdu_size,
336+
)
337+
if conn is None:
338+
sys.exit(1)
339+
340+
client = MMSClient(conn)
341+
try:
342+
if remaining:
343+
client.onecmd_plus_hooks(" ".join(remaining))
344+
else:
345+
client.cmdloop()
346+
except KeyboardInterrupt:
347+
logging.error("Operation cancelled by user...")
348+
except Exception as e:
349+
logging.exception("An unexpected error occurred: %s", e)
350+
sys.exit(1)
351+
finally:
352+
logging.debug("Closing MMS connection...")
353+
conn.close()
354+
355+
356+
if __name__ == "__main__":
357+
cli_main()

0 commit comments

Comments
 (0)