jiracli/jiracli/cli.py

462 lines
14 KiB
Python
Raw Normal View History

2022-09-15 21:36:04 +02:00
import configparser
2022-09-14 22:40:53 +02:00
import functools
2022-09-15 21:36:04 +02:00
import os
from pathlib import Path
2022-09-20 22:55:04 +02:00
from textwrap import dedent, indent, shorten, wrap
2022-09-14 22:40:53 +02:00
from typing import Any, Callable, Dict
import click
import editor
2022-09-15 21:36:04 +02:00
CONFIG_PATH = Path(os.environ.get('XDG_CONFIG_HOME', '~/.config')).expanduser() / 'jiracli' / 'config'
def get_config():
config = configparser.ConfigParser()
config.read(CONFIG_PATH)
if not config.has_section('server'):
raise ValueError(f"Config in {CONFIG_PATH} does not contain a section [server]")
return config
2022-09-14 22:40:53 +02:00
def ipython():
import IPython
IPython.embed()
@click.group()
@click.pass_context
def main(ctx):
from jira import JIRA
ctx.ensure_object(dict)
2022-09-15 21:36:04 +02:00
config = get_config()
ctx.obj['config'] = config
server_config = config['server']
skip_cert_verify = server_config.getboolean('skip_cert_verify', False)
if skip_cert_verify:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
jira_kwargs = dict(
options={
'server': server_config['host'],
'verify': not skip_cert_verify
},
timeout=server_config.getint('timeout', 2),
max_retries=server_config.getint('max_retries', 2)
)
if 'password' in server_config:
if 'user' not in server_config:
raise ValueError('If password is set in [server], user needs to be set, too.')
jira_kwargs['basic_auth'] = (server_config['user'], server_config['password'])
elif 'token' in server_config:
jira_kwargs['token_auth'] = server_config['token']
else:
raise ValueError("Need either user and password or token in [server] for login.")
ctx.obj['client'] = JIRA(**jira_kwargs)
2022-09-14 22:40:53 +02:00
def pass_client(fn):
@functools.wraps(fn)
@click.pass_obj
def wrapped(obj, *args, **kwargs):
client = obj['client']
return fn(client, *args, **kwargs)
return wrapped
2022-09-15 21:36:04 +02:00
def pass_config(fn):
@functools.wraps(fn)
@click.pass_obj
def wrapped(obj, *args, **kwargs):
config = obj['config']
return fn(config, *args, **kwargs)
return wrapped
2022-09-14 22:40:53 +02:00
@main.command()
@pass_client
def version(client):
print('Version {}.{}.{}-{}p{}'.format(*client.sys_version_info))
@main.command()
2022-09-15 21:36:04 +02:00
@click.option('--project', help='Project to filter for. Default: take from config [filters]/default_project')
2022-09-17 11:58:15 +02:00
@click.option('--component', 'components', help='Filter for issues with *any* of the given components')
2022-09-14 22:40:53 +02:00
@click.option('--status', multiple=True, help='Filter for issues with *any* of the given statuses')
@click.option('--label', 'labels', multiple=True, help='Filter for issues with all the given labels')
@click.option('--text', 'texts', multiple=True, help='Filter for issues containing all the given strings')
2022-09-17 11:58:15 +02:00
@click.option('--flagged/--not-flagged', is_flag=True, default=None, help='Filter issues being flagged or not flagged')
2022-09-14 22:40:53 +02:00
@click.option('--order', help='Sort by this field')
@click.option('--asc/--desc', 'asc', help='Sort ascending/descending. Default: descending')
@click.option('--jql', default='', help='Provide your own JQL query to search for issues. '
2022-09-15 21:36:04 +02:00
'Setting this ignores all other query options except --project.')
2022-09-14 22:40:53 +02:00
@pass_client
2022-09-15 21:36:04 +02:00
@pass_config
2022-09-17 11:58:15 +02:00
def issues(config, client, project, components, status, labels, texts, flagged, order, asc, jql):
filters_config = config['filters']
2022-09-15 21:36:04 +02:00
if not project and filters_config and 'default_project' in filters_config:
project = filters_config['default_project']
2022-09-17 11:58:15 +02:00
project_jql = f"project = {project}"
2022-09-15 21:36:04 +02:00
# TODO add filtering by created since
# TODO add filtering by updated
# TODO support pagination I guess? i.e. a limit
2022-09-14 22:40:53 +02:00
if not jql:
filters = []
if texts:
texts_jql = ' AND '.join([f'text ~ "{text}"' for text in texts])
filters.append(f"( {texts_jql} )")
2022-09-17 11:58:15 +02:00
if components:
component_list = ', '.join([f'"{c}"' for c in components])
components_jql = f"component in ({component_list})"
filters.append(components_jql)
2022-09-14 22:40:53 +02:00
if status:
status_list = ', '.join([f'"{s}"' for s in status])
status_jql = f"status IN ({status_list})"
filters.append(status_jql)
if labels:
labels_jql = ' AND '.join([f'labels = "{label}"' for label in labels])
filters.append(f"( {labels_jql} )")
2022-09-17 11:58:15 +02:00
if flagged is not None:
if flagged:
flagged_jql = 'flagged IS NOT EMPTY'
else:
flagged_jql = 'flagged IS EMPTY'
filters.append(flagged_jql)
if project:
filters.append(project_jql)
2022-09-14 22:40:53 +02:00
jql = ' AND '.join(filters)
if order:
jql += f" ORDER BY {order} {'ASC' if asc else 'DESC'}"
2022-09-17 11:58:15 +02:00
else:
if project:
jql = f"project = {project} AND {jql}"
2022-09-15 21:36:04 +02:00
2022-09-14 22:40:53 +02:00
print(f"Searching with query: {jql}")
issues = client.search_issues(jql)
for issue in issues:
# TODO tabulate?
print(f"{issue.key:>8} - {issue.fields.status} - {issue.fields.created} - {shorten(issue.fields.summary, 72)}")
def get_more_attr(obj, fields):
fields = fields.split('.')
for field in fields:
obj = getattr(obj, field)
if not obj:
break
return obj
def comment_to_dict(comment):
fields = ['created', 'author.displayName', 'body']
return {f: get_more_attr(comment, f) for f in fields}
def issue_to_yaml(issue):
import yaml
info_fields = ['creator.displayName', 'reporter.displayName', 'created']
editable_fields = ['summary', 'description', 'status.name', 'priority.name', 'labels',
'issuelinks', 'components', 'assignee.displayName']
d = {
'info': {f: get_more_attr(issue.fields, f) for f in info_fields},
'editable': {f: get_more_attr(issue.fields, f) for f in editable_fields},
'comments': [comment_to_dict(c) for c in issue.fields.comment.comments]
}
return yaml.dump(d)
# @main.command()
# @click.argument('issue_id')
# def issue(issue_id):
# try:
# issue = client.issue(issue_id)
# except JIRAError as e:
# print(f'Problem retrieving issue: {e.text}')
# return
#
# # import IPython
# # IPython.embed()
# original_content = issue_to_yaml(issue)
# print(original_content)
# # new_content = editor.edit(contents=original_content).decode('utf-8')
# # print(dir(issue))
@main.command()
def dump():
try:
from yaml import CDumper as Dumper
except ImportError:
from yaml import Dumper
import ruamel.yaml
# from ruamel.yaml import YAML
import yaml
def str_presenter(dumper, data):
"""configures yaml for dumping multiline strings
Ref: https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data"""
if data.count('\n') > 0: # check for multiline string
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
# yaml.add_representer(str, str_presenter)
# yaml.representer.SafeRepresenter.add_representer(str, str_presenter) # to use with safe_dum
def yaml_multiline_string_pipe(dumper, data):
text_list = [line for line in data.splitlines()]
fixed_data = "\n".join(text_list)
if len(text_list) > 1:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="|")
return dumper.represent_scalar('tag:yaml.org,2002:str', fixed_data)
# yaml.add_representer(str, yaml_multiline_string_pipe)
print(yaml.dump({"multiline": "First line \nSecond line\nThird line"},
allow_unicode=True, Dumper=Dumper, default_style="|"))
import sys
ruamel.yaml.dump({"multiline": "First line \nSecond line\nThirdline"}, sys.stdout, default_style='|')
@main.group(invoke_without_command=True)
@click.argument('issue_id')
@pass_client
@click.pass_obj
def issue(obj: Dict[str, Any], client, issue_id: str):
from jira.exceptions import JIRAError
try:
issue = client.issue(issue_id)
except JIRAError as e:
raise click.ClickException(f"Problem retrieving {issue_id}: {e.text}")
obj['issue'] = issue
obj['issue_id'] = issue_id
def pass_issue(fn: Callable) -> Callable:
@functools.wraps(fn)
@click.pass_obj
def wrapped(obj, *args, **kwargs):
issue = obj['issue']
return fn(issue, *args, **kwargs)
return wrapped
@issue.group()
def link():
pass
2022-09-20 22:55:04 +02:00
def _format_links(client, issue):
lines = []
2022-09-15 21:36:04 +02:00
issuelinks = getattr(issue.fields, 'issuelinks', [])
2022-09-14 22:40:53 +02:00
if issuelinks:
2022-09-20 22:55:04 +02:00
lines.append("Issue Links")
2022-09-14 22:40:53 +02:00
# TODO group by type?
for issuelink in issuelinks:
2022-09-20 22:55:04 +02:00
other_issue = getattr(issuelink, 'inwardIssue', getattr(issuelink, 'outwardIssue', None))
2022-09-15 21:36:04 +02:00
summary = indent('\n'.join(wrap(other_issue.fields.summary, width=72)), prefix=' ' * 4)
2022-09-20 22:55:04 +02:00
lines.append(f" {issuelink.type} - {other_issue.key}\n{summary}")
2022-09-14 22:40:53 +02:00
remote_links = client.remote_links(issue.id)
if remote_links:
2022-09-20 22:55:04 +02:00
if lines:
lines.append('')
lines.append("Remote Links:")
2022-09-14 22:40:53 +02:00
for link in remote_links:
# TODO tabulate?
2022-09-20 22:55:04 +02:00
lines.append(f" {link.object.title} => {link.object.url}")
return '\n'.join(lines)
@link.command('list')
@pass_issue
@pass_client
def link_list(client, issue):
print(_format_links(client, issue))
2022-09-14 22:40:53 +02:00
2022-09-15 21:36:04 +02:00
def prompt(description: str, *, multiline_ok: bool = True, initial_content: str = '', empty_ok: bool = False) -> str:
2022-09-14 22:40:53 +02:00
divider_line = '--- Everything after this line will be removed'
contents = f"{initial_content}\n{divider_line}\n\n{description}"
contents = editor.edit(contents=contents).decode('utf-8')
if not contents:
raise click.ClickException('Empty content. Cannot continue.')
contents_lines = contents.splitlines()
for i, line in enumerate(reversed(contents_lines), start=1):
if line == divider_line:
break
else:
raise click.ClickException('Could not find divider line. Please keep the footer intact.')
contents_lines = contents_lines[:-i]
if not multiline_ok and len(contents_lines) > 1:
# TODO build a retry into this?
raise click.ClickException('Input needs to be a single line only')
2022-09-15 21:36:04 +02:00
if not empty_ok and not any(contents_lines):
2022-09-14 22:40:53 +02:00
raise click.ClickException('Empty content. Cannot continue.')
return '\n'.join(contents_lines)
@link.command('create')
@click.argument('URL', required=False)
@click.argument('TITLE', required=False)
@pass_issue
@pass_client
def link_create(client, issue, url: str, title: str):
if url is None:
2022-09-15 21:36:04 +02:00
url = prompt(f"Input the URL for the link to add to {issue.key}", multiline_ok=False)
2022-09-14 22:40:53 +02:00
if title is None:
title = prompt(f"Input a single line of title for {url}. This will be added to {issue.key}.",
multiline_ok=False)
client.add_simple_link(issue.key, {'url': url, 'title': title})
@link.command('issue')
@pass_issue
@pass_client
def link_issue(client, issue):
"""Link to another issue"""
raise NotImplementedError
@issue.command('open')
@pass_issue
def issue_open(issue):
import subprocess
subprocess.call(['xdg-open', issue.permalink()])
@issue.group()
def label():
pass
@label.command('list')
@pass_issue
def label_list(issue):
"""List all labels on the issue"""
print('\n'.join(issue.fields.labels))
@label.command('edit')
@pass_issue
def label_edit(issue):
"""Edit the labels of the issue"""
import jira
original_labels = issue.fields.labels
# TODO add a configurable list of "often used" labels into the description
labels = set(prompt('Add or remove labels by adding them line by line or removing one',
2022-09-15 21:36:04 +02:00
initial_content='\n'.join(original_labels),
empty_ok=True).splitlines())
2022-09-14 22:40:53 +02:00
original_labels = set(original_labels)
if labels == original_labels:
print(f"Not updating {issue.key} as the labels are the same.")
return
changes = []
added_labels = labels - original_labels
for label in added_labels:
changes.append({'add': label})
removed_labels = original_labels - labels
for label in removed_labels:
changes.append({'remove': label})
# inspired by jira.Issue.add_field_value
jira.resources.Resource.update(issue, fields={"update": {"labels": changes}})
2022-09-15 21:36:04 +02:00
@issue.command()
@pass_issue
def debug(issue):
import IPython
2022-09-20 22:55:04 +02:00
IPython.embed()
2022-09-15 21:36:04 +02:00
@issue.command()
@pass_issue
2022-09-20 22:55:04 +02:00
@pass_client
def show(client, issue):
2022-09-15 21:36:04 +02:00
"""Show a lot of information about the issue"""
2022-09-20 22:55:04 +02:00
print(f"{issue.key} on {issue.fields.status} as {issue.fields.issuetype} assigned to {issue.fields.assignee}")
if getattr(issue.fields, 'parent', None):
parent = issue.fields.parent
print(f" parent: {parent.key} on {parent.fields.status} as {parent.fields.issuetype}")
summary = indent('\n'.join(wrap(parent.fields.summary, width=72)), prefix=' ' * 4)
print(f"{summary}")
print(indent(dedent(f"""\
created: {issue.fields.created}
components: {', '.join(issue.fields.components)}
flagged: {getattr(issue.fields, 'flagged', False)}
labels: {', '.join(issue.fields.labels)}
priority: {issue.fields.priority}
reporter: {issue.fields.reporter}"""),
' ' * 2))
def _header(s):
# print(f"\n {s}\n {'+' * len(s)}")
print(f"\n +++ {s} +++")
summary = indent('\n'.join(wrap(issue.fields.summary, replace_whitespace=False)), prefix=' ' * 2)
_header('summary')
print(summary)
description = indent('\n'.join(wrap(issue.fields.description or '', replace_whitespace=False)), prefix=' ' * 2)
_header('description')
print(description)
_header('attachments')
for i, attachment in enumerate(issue.fields.attachment):
print(f" * {attachment.filename} by {attachment.author}")
print(f" {attachment.content}")
print(f" {attachment.mimeType} with {attachment.size} B")
_header('links')
links = _format_links(client, issue)
if links:
print(indent(links, prefix=' ' * 2))
_header('comments')
for i, comment in enumerate(issue.fields.comment.comments):
if i != 0:
print('')
print(f" {comment.created} - {comment.author}")
body = indent('\n'.join(wrap(comment.body, replace_whitespace=False)), ' ' * 4)
print(body)
print('')