import configparser import functools import os from pathlib import Path from textwrap import dedent, indent, shorten, wrap from typing import Any, Callable, Dict import click import editor CONFIG_PATH = Path(os.environ.get('XDG_CONFIG_HOME', '~/.config')).expanduser() / 'jiracli' / 'config' def get_config(): config = configparser.ConfigParser() config.read(CONFIG_PATH) if not config.has_section('server'): raise ValueError(f"Config in {CONFIG_PATH} does not contain a section [server]") return config def ipython(): import IPython IPython.embed() @click.group() @click.pass_context def main(ctx): from jira import JIRA ctx.ensure_object(dict) config = get_config() ctx.obj['config'] = config server_config = config['server'] skip_cert_verify = server_config.getboolean('skip_cert_verify', False) if skip_cert_verify: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) jira_kwargs = dict( options={ 'server': server_config['host'], 'verify': not skip_cert_verify }, timeout=server_config.getint('timeout', 2), max_retries=server_config.getint('max_retries', 2) ) if 'password' in server_config: if 'user' not in server_config: raise ValueError('If password is set in [server], user needs to be set, too.') jira_kwargs['basic_auth'] = (server_config['user'], server_config['password']) elif 'token' in server_config: jira_kwargs['token_auth'] = server_config['token'] else: raise ValueError("Need either user and password or token in [server] for login.") ctx.obj['client'] = JIRA(**jira_kwargs) def pass_client(fn): @functools.wraps(fn) @click.pass_obj def wrapped(obj, *args, **kwargs): client = obj['client'] return fn(client, *args, **kwargs) return wrapped def pass_config(fn): @functools.wraps(fn) @click.pass_obj def wrapped(obj, *args, **kwargs): config = obj['config'] return fn(config, *args, **kwargs) return wrapped @main.command() @pass_client def version(client): print('Version {}.{}.{}-{}p{}'.format(*client.sys_version_info)) @main.command() @click.option('--project', help='Project to filter for. Default: take from config [filters]/default_project') @click.option('--component', 'components', multiple=True, help='Filter for issues with *any* of the given components') @click.option('--status', multiple=True, help='Filter for issues with *any* of the given statuses. ' '! at the beginning means NOT.') @click.option('--label', 'labels', multiple=True, help='Filter for issues with all the given labels') @click.option('--text', 'texts', multiple=True, help='Filter for issues containing all the given strings') @click.option('--flagged/--not-flagged', is_flag=True, default=None, help='Filter issues being flagged or not flagged') @click.option('--watching/--not-watching', is_flag=True, default=None, help='Filter issues I watch or do not watch') @click.option('--order', help='Sort by this field') @click.option('--asc/--desc', 'asc', help='Sort ascending/descending. Default: descending') @click.option('--jql', help='Provide your own JQL query string to include in the search via AND.') @pass_client @pass_config def issues(config, client, project, components, status, labels, texts, flagged, watching, order, asc, jql): if project is None and config.has_section('filters') and config.get('filters', 'default_project'): project = config['filters']['default_project'] # TODO add filtering by created since # TODO add filtering by updated # TODO support pagination I guess? i.e. a limit filters = [] if jql: filters.append(jql) if texts: texts_jql = ' AND '.join([f'text ~ "{text}"' for text in texts]) filters.append(f"( {texts_jql} )") if components: component_list = ', '.join([f'"{c}"' for c in components]) components_jql = f"component in ({component_list})" filters.append(components_jql) if status: status_list = ', '.join([f'"{s}"' for s in status if not s.startswith('!')]) if status_list: status_jql = f"status IN ({status_list})" filters.append(status_jql) status_list = ', '.join([f'"{s[1:]}"' for s in status if s.startswith('!')]) if status_list: status_jql = f"status NOT IN ({status_list})" filters.append(status_jql) if labels: labels_jql = ' AND '.join([f'labels = "{label}"' for label in labels]) filters.append(f"( {labels_jql} )") if flagged is not None: if flagged: flagged_jql = 'flagged IS NOT EMPTY' else: flagged_jql = 'flagged IS EMPTY' filters.append(flagged_jql) if watching is not None: if watching: watching_jql = 'watcher = currentUser()' else: watching_jql = 'watcher != currentUser()' filters.append(watching_jql) if project: project_jql = f"project = {project}" filters.append(project_jql) jql = ' AND '.join(filters) if order: jql += f" ORDER BY {order} {'ASC' if asc else 'DESC'}" print(f"Searching with query: {jql}") issues = client.search_issues(jql) for issue in issues: # TODO tabulate? print(f"{issue.key:>8} - {issue.fields.status} - {issue.fields.created} - {shorten(issue.fields.summary, 72)}") def get_more_attr(obj, fields): fields = fields.split('.') for field in fields: obj = getattr(obj, field) if not obj: break return obj def comment_to_dict(comment): fields = ['created', 'author.displayName', 'body'] return {f: get_more_attr(comment, f) for f in fields} def issue_to_yaml(issue): import yaml info_fields = ['creator.displayName', 'reporter.displayName', 'created'] editable_fields = ['summary', 'description', 'status.name', 'priority.name', 'labels', 'issuelinks', 'components', 'assignee.displayName'] d = { 'info': {f: get_more_attr(issue.fields, f) for f in info_fields}, 'editable': {f: get_more_attr(issue.fields, f) for f in editable_fields}, 'comments': [comment_to_dict(c) for c in issue.fields.comment.comments] } return yaml.dump(d) # @main.command() # @click.argument('issue_id') # def issue(issue_id): # try: # issue = client.issue(issue_id) # except JIRAError as e: # print(f'Problem retrieving issue: {e.text}') # return # # # import IPython # # IPython.embed() # original_content = issue_to_yaml(issue) # print(original_content) # # new_content = editor.edit(contents=original_content).decode('utf-8') # # print(dir(issue)) @main.command() def dump(): try: from yaml import CDumper as Dumper except ImportError: from yaml import Dumper import ruamel.yaml # from ruamel.yaml import YAML import yaml def str_presenter(dumper, data): """configures yaml for dumping multiline strings Ref: https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data""" if data.count('\n') > 0: # check for multiline string return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) # yaml.add_representer(str, str_presenter) # yaml.representer.SafeRepresenter.add_representer(str, str_presenter) # to use with safe_dum def yaml_multiline_string_pipe(dumper, data): text_list = [line for line in data.splitlines()] fixed_data = "\n".join(text_list) if len(text_list) > 1: return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="|") return dumper.represent_scalar('tag:yaml.org,2002:str', fixed_data) # yaml.add_representer(str, yaml_multiline_string_pipe) print(yaml.dump({"multiline": "First line \nSecond line\nThird line"}, allow_unicode=True, Dumper=Dumper, default_style="|")) import sys ruamel.yaml.dump({"multiline": "First line \nSecond line\nThirdline"}, sys.stdout, default_style='|') @main.group(invoke_without_command=True) @click.argument('issue_id') @pass_client @click.pass_obj def issue(obj: Dict[str, Any], client, issue_id: str): from jira.exceptions import JIRAError try: issue = client.issue(issue_id) except JIRAError as e: raise click.ClickException(f"Problem retrieving {issue_id}: {e.text}") obj['issue'] = issue obj['issue_id'] = issue_id def pass_issue(fn: Callable) -> Callable: @functools.wraps(fn) @click.pass_obj def wrapped(obj, *args, **kwargs): issue = obj['issue'] return fn(issue, *args, **kwargs) return wrapped @issue.group() def link(): pass def _format_links(client, issue): lines = [] issuelinks = getattr(issue.fields, 'issuelinks', []) if issuelinks: lines.append("Issue Links") # TODO group by type? for issuelink in issuelinks: other_issue = getattr(issuelink, 'inwardIssue', getattr(issuelink, 'outwardIssue', None)) summary = indent('\n'.join(wrap(other_issue.fields.summary, width=72)), prefix=' ' * 4) lines.append(f" {issuelink.type} - {other_issue.key}\n{summary}") remote_links = client.remote_links(issue.id) if remote_links: if lines: lines.append('') lines.append("Remote Links:") for link in remote_links: # TODO tabulate? lines.append(f" {link.object.title} => {link.object.url}") return '\n'.join(lines) @link.command('list') @pass_issue @pass_client def link_list(client, issue): print(_format_links(client, issue)) def prompt(description: str, *, multiline_ok: bool = True, initial_content: str = '', empty_ok: bool = False) -> str: divider_line = '--- Everything after this line will be removed' contents = f"{initial_content}\n{divider_line}\n\n{description}" contents = editor.edit(contents=contents).decode('utf-8') if not contents: raise click.ClickException('Empty content. Cannot continue.') contents_lines = contents.splitlines() for i, line in enumerate(reversed(contents_lines), start=1): if line == divider_line: break else: raise click.ClickException('Could not find divider line. Please keep the footer intact.') contents_lines = contents_lines[:-i] if not multiline_ok and len(contents_lines) > 1: # TODO build a retry into this? raise click.ClickException('Input needs to be a single line only') if not empty_ok and not any(contents_lines): raise click.ClickException('Empty content. Cannot continue.') return '\n'.join(contents_lines) @link.command('create') @click.argument('URL', required=False) @click.argument('TITLE', required=False) @pass_issue @pass_client def link_create(client, issue, url: str, title: str): if url is None: url = prompt(f"Input the URL for the link to add to {issue.key}", multiline_ok=False) if title is None: title = prompt(f"Input a single line of title for {url}. This will be added to {issue.key}.", multiline_ok=False) client.add_simple_link(issue.key, {'url': url, 'title': title}) @link.command('issue') @pass_issue @pass_client def link_issue(client, issue): """Link to another issue""" raise NotImplementedError @issue.command('open') @pass_issue def issue_open(issue): import subprocess subprocess.call(['xdg-open', issue.permalink()]) @issue.group() def label(): pass @label.command('list') @pass_issue def label_list(issue): """List all labels on the issue""" print('\n'.join(issue.fields.labels)) @label.command('edit') @pass_issue def label_edit(issue): """Edit the labels of the issue""" import jira original_labels = issue.fields.labels # TODO add a configurable list of "often used" labels into the description labels = set(prompt('Add or remove labels by adding them line by line or removing one', initial_content='\n'.join(original_labels), empty_ok=True).splitlines()) original_labels = set(original_labels) if labels == original_labels: print(f"Not updating {issue.key} as the labels are the same.") return changes = [] added_labels = labels - original_labels for label in added_labels: changes.append({'add': label}) removed_labels = original_labels - labels for label in removed_labels: changes.append({'remove': label}) # inspired by jira.Issue.add_field_value jira.resources.Resource.update(issue, fields={"update": {"labels": changes}}) @issue.command() @pass_issue @pass_client def debug(client, issue): import IPython IPython.embed() @issue.command() @pass_issue @pass_client def show(client, issue): """Show a lot of information about the issue""" print(f"{issue.key} on {issue.fields.status} as {issue.fields.issuetype} assigned to {issue.fields.assignee}") if getattr(issue.fields, 'parent', None): parent = issue.fields.parent print(f" parent: {parent.key} on {parent.fields.status} as {parent.fields.issuetype}") summary = indent('\n'.join(wrap(parent.fields.summary, width=72)), prefix=' ' * 4) print(f"{summary}") # TODO handle customfields in a good manner, e.g. let the user enable # showing them by a custom name or their own. use cache for that. print(indent(dedent(f"""\ created: {issue.fields.created} components: {', '.join(c.name for c in issue.fields.components)} flagged: {bool(getattr(issue.fields, 'customfield_13216', False))} labels: {', '.join(issue.fields.labels)} priority: {issue.fields.priority} reporter: {issue.fields.reporter}"""), ' ' * 2)) def _header(s): # print(f"\n {s}\n {'+' * len(s)}") print(f"\n +++ {s} +++") summary = indent('\n'.join(wrap(issue.fields.summary, replace_whitespace=False)), prefix=' ' * 2) _header('summary') print(summary) description = indent('\n'.join(wrap(issue.fields.description or '', replace_whitespace=False)), prefix=' ' * 2) _header('description') print(description) _header('attachments') for i, attachment in enumerate(issue.fields.attachment): print(f" * {attachment.filename} by {attachment.author}") print(f" {attachment.content}") print(f" {attachment.mimeType} with {attachment.size} B") _header('links') links = _format_links(client, issue) if links: print(indent(links, prefix=' ' * 2)) _header('comments') for i, comment in enumerate(issue.fields.comment.comments): if i != 0: print('') print(f" {comment.created} - {comment.author} - {comment.id}") body = indent('\n'.join(wrap(comment.body, replace_whitespace=False)), ' ' * 4) print(body) print('') @issue.group() def comment(): pass @comment.command('create') @pass_issue @pass_client def comment_create(client, issue): """Create a comment on the issue""" # TODO add some help text for the syntax in the description # TODO add all issue info to the description, so we can autocomplete in the # editor text = prompt(f"Comment text to add as new comment to {issue.key}") client.add_comment(issue.key, text) def get_comment(ctx, param, comment_id): """click callback returning a Comment for the comment_id value given by the user""" from jira.exceptions import JIRAError issue = ctx.obj['issue'] if not comment_id: from clintermission import cli_select_item options = [] for comment in issue.fields.comment.comments: # TODO make this roughly terminal width or add a shorten() method # with … at the end body = comment.body.replace('\n', ' ')[:64] options.append((f"{comment.created} - {comment.author} - {body}", comment.id)) _, comment_id = cli_select_item(options) client = ctx.obj['client'] try: return client.comment(issue.key, comment_id, expand=['body']) except JIRAError as e: raise click.ClickException(f"Problem retrieving {comment_id} for issue {issue.key}: {e.text}") @comment.command('edit') @click.argument('comment', required=False, callback=get_comment) @pass_issue @pass_client def comment_edit(client, issue, comment): """Edit the text of an issues comment""" text = prompt(f"Comment text to set as new body for comment {comment.id} on {issue.key}", initial_content=comment.body) if text == comment.body: print("No changes. Aborting.") return comment.update(body=text) @comment.command('delete') @click.argument('comment', required=False, callback=get_comment) @pass_issue @pass_client def comment_delete(client, issue, comment): # TODO print formatted comment for verification click.confirm("Do you really want to delete this ^ ?", abort=True) comment.delete()