put_parser.add_argument('local_path', type=str, action=path_to_bytes,
help='Path of the file in the local system')
put_parser.add_argument('remote_path', type=str, action=path_to_bytes,
- help='Path of the file in the remote system.',
- nargs='?', default='.')
+ help='Path of the file in the remote system')
put_parser.add_argument('-f', '--force', action='store_true',
help='Overwrites the destination if it already exists.')
"""
Copy a local file/directory to CephFS.
"""
+ if args.local_path != b'-' and not os.path.isfile(args.local_path) \
+ and not os.path.isdir(args.local_path):
+ set_exit_code_msg(errno.ENOENT,
+ msg=f"error: "
+ f"{args.local_path.decode('utf-8')}: "
+ f"No such file or directory")
+ return
+
+ if (is_file_exists(args.remote_path) or is_dir_exists(
+ args.remote_path)) and not args.force:
+ set_exit_code_msg(msg=f"error: file/directory "
+ f"{args.remote_path.decode('utf-8')} "
+ f"exists, use --force to overwrite")
+ return
+
root_src_dir = args.local_path
root_dst_dir = args.remote_path
if args.local_path == b'.' or args.local_path == b'./':
root_dst_dir += b'/'
if args.local_path == b'-' or os.path.isfile(root_src_dir):
- if not args.force:
- if os.path.isfile(root_src_dir):
- dst_file = root_dst_dir
- if is_file_exists(dst_file):
- set_exit_code_msg(errno.EEXIST,
- f"{dst_file.decode('utf-8')}: file "
- "exists! use --force to overwrite")
- return
if args.local_path == b'-':
root_src_dir = b'-'
copy_from_local(root_src_dir, root_dst_dir)
return self.complete_filenames(text, line, begidx, endidx)
get_parser = argparse.ArgumentParser(
- description='Copy a file from Ceph File System from Local Directory.')
+ description='Copy a file from Ceph File System to Local Directory.')
get_parser.add_argument('remote_path', type=str, action=path_to_bytes,
help='Path of the file in the remote system')
get_parser.add_argument('local_path', type=str, action=path_to_bytes,
- help='Path of the file in the local system',
- nargs='?', default='.')
+ help='Path of the file in the local system')
get_parser.add_argument('-f', '--force', action='store_true',
help='Overwrites the destination if it already exists.')
"""
Copy a file/directory from CephFS to given path.
"""
+ if not is_file_exists(args.remote_path) and not \
+ is_dir_exists(args.remote_path):
+ set_exit_code_msg(errno.ENOENT, "error: no file/directory"
+ " found at specified remote "
+ "path")
+ return
+ if (os.path.isfile(args.local_path) or os.path.isdir(
+ args.local_path)) and not args.force:
+ set_exit_code_msg(msg=f"error: file/directory "
+ f"{args.local_path.decode('utf-8')}"
+ f" already exists, use --force to "
+ f"overwrite")
+ return
root_src_dir = args.remote_path
root_dst_dir = args.local_path
fname = root_src_dir.rsplit(b'/', 1)
return
copy_to_local(root_src_dir, b'-')
elif is_file_exists(args.remote_path):
- copy_to_local(root_src_dir,
- root_dst_dir + b'/' + root_src_dir)
+ copy_to_local(root_src_dir, root_dst_dir)
elif b'/' in root_src_dir and is_file_exists(fname[1], fname[0]):
copy_to_local(root_src_dir, root_dst_dir)
else:
files = list(reversed(sorted(dirwalk(root_src_dir))))
- if len(files) == 0:
- try:
- os.makedirs(root_dst_dir + b'/' + root_src_dir)
- except OSError as e:
- if args.force:
- pass
- else:
- set_exit_code_msg(e.errno, f"{root_src_dir.decode('utf-8')}: "
- "already exists! use --force to overwrite")
- return
-
for file_ in files:
dst_dirpath, dst_file = file_.rsplit(b'/', 1)
if dst_dirpath in files:
except OSError:
pass
else:
- if not args.force:
- try:
- os.stat(dst_path)
- set_exit_code_msg(errno.EEXIST, f"{file_.decode('utf-8')}: "
- "file already exists! use --force to override")
- return
- except OSError:
- copy_to_local(file_, dst_path)
- else:
- copy_to_local(file_, dst_path)
+ copy_to_local(file_, dst_path)
return 0