542 lines
17 KiB
Python
542 lines
17 KiB
Python
import configparser
|
|
import functools
|
|
import os
|
|
from pathlib import Path
|
|
from textwrap import dedent, indent, shorten, wrap
|
|
from typing import Any, Callable, Dict
|
|
|
|
import click
|
|
import editor
|
|
|
|
|
|
CONFIG_PATH = Path(os.environ.get('XDG_CONFIG_HOME', '~/.config')).expanduser() / 'jiracli' / 'config'
|
|
|
|
|
|
def get_config():
|
|
config = configparser.ConfigParser()
|
|
config.read(CONFIG_PATH)
|
|
|
|
if not config.has_section('server'):
|
|
raise ValueError(f"Config in {CONFIG_PATH} does not contain a section [server]")
|
|
|
|
return config
|
|
|
|
|
|
def ipython():
|
|
import IPython
|
|
IPython.embed()
|
|
|
|
|
|
@click.group()
|
|
@click.pass_context
|
|
def main(ctx):
|
|
from jira import JIRA
|
|
|
|
ctx.ensure_object(dict)
|
|
|
|
config = get_config()
|
|
ctx.obj['config'] = config
|
|
|
|
server_config = config['server']
|
|
|
|
skip_cert_verify = server_config.getboolean('skip_cert_verify', False)
|
|
if skip_cert_verify:
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
jira_kwargs = dict(
|
|
options={
|
|
'server': server_config['host'],
|
|
'verify': not skip_cert_verify
|
|
},
|
|
timeout=server_config.getint('timeout', 2),
|
|
max_retries=server_config.getint('max_retries', 2)
|
|
)
|
|
if 'password' in server_config:
|
|
if 'user' not in server_config:
|
|
raise ValueError('If password is set in [server], user needs to be set, too.')
|
|
jira_kwargs['basic_auth'] = (server_config['user'], server_config['password'])
|
|
elif 'token' in server_config:
|
|
jira_kwargs['token_auth'] = server_config['token']
|
|
else:
|
|
raise ValueError("Need either user and password or token in [server] for login.")
|
|
|
|
ctx.obj['client'] = JIRA(**jira_kwargs)
|
|
|
|
|
|
def pass_client(fn):
|
|
@functools.wraps(fn)
|
|
@click.pass_obj
|
|
def wrapped(obj, *args, **kwargs):
|
|
client = obj['client']
|
|
return fn(client, *args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def pass_config(fn):
|
|
@functools.wraps(fn)
|
|
@click.pass_obj
|
|
def wrapped(obj, *args, **kwargs):
|
|
config = obj['config']
|
|
return fn(config, *args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
@main.command()
|
|
@pass_client
|
|
def version(client):
|
|
print('Version {}.{}.{}-{}p{}'.format(*client.sys_version_info))
|
|
|
|
|
|
@main.command()
|
|
@click.option('--project', help='Project to filter for. Default: take from config [filters]/default_project')
|
|
@click.option('--component', 'components', multiple=True, help='Filter for issues with *any* of the given components')
|
|
@click.option('--status', multiple=True, help='Filter for issues with *any* of the given statuses. '
|
|
'! at the beginning means NOT.')
|
|
@click.option('--label', 'labels', multiple=True, help='Filter for issues with all the given labels')
|
|
@click.option('--text', 'texts', multiple=True, help='Filter for issues containing all the given strings')
|
|
@click.option('--flagged/--not-flagged', is_flag=True, default=None, help='Filter issues being flagged or not flagged')
|
|
@click.option('--watching/--not-watching', is_flag=True, default=None, help='Filter issues I watch or do not watch')
|
|
@click.option('--order', help='Sort by this field')
|
|
@click.option('--asc/--desc', 'asc', help='Sort ascending/descending. Default: descending')
|
|
@click.option('--jql', help='Provide your own JQL query string to include in the search via AND.')
|
|
@pass_client
|
|
@pass_config
|
|
def issues(config, client, project, components, status, labels, texts, flagged, watching, order, asc, jql):
|
|
if project is None and config.has_section('filters') and config.get('filters', 'default_project'):
|
|
project = config['filters']['default_project']
|
|
|
|
# TODO add filtering by created since
|
|
# TODO add filtering by updated
|
|
# TODO support pagination I guess? i.e. a limit
|
|
filters = []
|
|
|
|
if jql:
|
|
filters.append(jql)
|
|
|
|
if texts:
|
|
texts_jql = ' AND '.join([f'text ~ "{text}"' for text in texts])
|
|
filters.append(f"( {texts_jql} )")
|
|
|
|
if components:
|
|
component_list = ', '.join([f'"{c}"' for c in components])
|
|
components_jql = f"component in ({component_list})"
|
|
filters.append(components_jql)
|
|
|
|
if status:
|
|
status_list = ', '.join([f'"{s}"' for s in status if not s.startswith('!')])
|
|
if status_list:
|
|
status_jql = f"status IN ({status_list})"
|
|
filters.append(status_jql)
|
|
status_list = ', '.join([f'"{s[1:]}"' for s in status if s.startswith('!')])
|
|
if status_list:
|
|
status_jql = f"status NOT IN ({status_list})"
|
|
filters.append(status_jql)
|
|
|
|
if labels:
|
|
labels_jql = ' AND '.join([f'labels = "{label}"' for label in labels])
|
|
filters.append(f"( {labels_jql} )")
|
|
|
|
if flagged is not None:
|
|
if flagged:
|
|
flagged_jql = 'flagged IS NOT EMPTY'
|
|
else:
|
|
flagged_jql = 'flagged IS EMPTY'
|
|
filters.append(flagged_jql)
|
|
|
|
if watching is not None:
|
|
if watching:
|
|
watching_jql = 'watcher = currentUser()'
|
|
else:
|
|
watching_jql = 'watcher != currentUser()'
|
|
filters.append(watching_jql)
|
|
|
|
if project:
|
|
project_jql = f"project = {project}"
|
|
filters.append(project_jql)
|
|
|
|
jql = ' AND '.join(filters)
|
|
|
|
if order:
|
|
jql += f" ORDER BY {order} {'ASC' if asc else 'DESC'}"
|
|
|
|
print(f"Searching with query: {jql}")
|
|
issues = client.search_issues(jql)
|
|
for issue in issues:
|
|
# TODO tabulate?
|
|
print(f"{issue.key:>8} - {issue.fields.status} - {issue.fields.created} - {shorten(issue.fields.summary, 72)}")
|
|
|
|
|
|
def get_more_attr(obj, fields):
|
|
fields = fields.split('.')
|
|
for field in fields:
|
|
obj = getattr(obj, field)
|
|
if not obj:
|
|
break
|
|
return obj
|
|
|
|
|
|
def comment_to_dict(comment):
|
|
fields = ['created', 'author.displayName', 'body']
|
|
return {f: get_more_attr(comment, f) for f in fields}
|
|
|
|
|
|
def issue_to_yaml(issue):
|
|
import yaml
|
|
|
|
info_fields = ['creator.displayName', 'reporter.displayName', 'created']
|
|
editable_fields = ['summary', 'description', 'status.name', 'priority.name', 'labels',
|
|
'issuelinks', 'components', 'assignee.displayName']
|
|
d = {
|
|
'info': {f: get_more_attr(issue.fields, f) for f in info_fields},
|
|
'editable': {f: get_more_attr(issue.fields, f) for f in editable_fields},
|
|
'comments': [comment_to_dict(c) for c in issue.fields.comment.comments]
|
|
}
|
|
return yaml.dump(d)
|
|
|
|
|
|
# @main.command()
|
|
# @click.argument('issue_id')
|
|
# def issue(issue_id):
|
|
# try:
|
|
# issue = client.issue(issue_id)
|
|
# except JIRAError as e:
|
|
# print(f'Problem retrieving issue: {e.text}')
|
|
# return
|
|
#
|
|
# # import IPython
|
|
# # IPython.embed()
|
|
# original_content = issue_to_yaml(issue)
|
|
# print(original_content)
|
|
# # new_content = editor.edit(contents=original_content).decode('utf-8')
|
|
# # print(dir(issue))
|
|
|
|
|
|
@main.command()
|
|
def dump():
|
|
try:
|
|
from yaml import CDumper as Dumper
|
|
except ImportError:
|
|
from yaml import Dumper
|
|
|
|
import ruamel.yaml
|
|
# from ruamel.yaml import YAML
|
|
import yaml
|
|
|
|
def str_presenter(dumper, data):
|
|
"""configures yaml for dumping multiline strings
|
|
Ref: https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data"""
|
|
if data.count('\n') > 0: # check for multiline string
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
|
|
|
# yaml.add_representer(str, str_presenter)
|
|
# yaml.representer.SafeRepresenter.add_representer(str, str_presenter) # to use with safe_dum
|
|
|
|
def yaml_multiline_string_pipe(dumper, data):
|
|
text_list = [line for line in data.splitlines()]
|
|
fixed_data = "\n".join(text_list)
|
|
if len(text_list) > 1:
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="|")
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', fixed_data)
|
|
|
|
# yaml.add_representer(str, yaml_multiline_string_pipe)
|
|
print(yaml.dump({"multiline": "First line \nSecond line\nThird line"},
|
|
allow_unicode=True, Dumper=Dumper, default_style="|"))
|
|
import sys
|
|
ruamel.yaml.dump({"multiline": "First line \nSecond line\nThirdline"}, sys.stdout, default_style='|')
|
|
|
|
|
|
@main.group(invoke_without_command=True)
|
|
@click.argument('issue_id')
|
|
@pass_client
|
|
@click.pass_obj
|
|
def issue(obj: Dict[str, Any], client, issue_id: str):
|
|
from jira.exceptions import JIRAError
|
|
|
|
try:
|
|
issue = client.issue(issue_id)
|
|
except JIRAError as e:
|
|
raise click.ClickException(f"Problem retrieving {issue_id}: {e.text}")
|
|
|
|
obj['issue'] = issue
|
|
obj['issue_id'] = issue_id
|
|
|
|
|
|
def pass_issue(fn: Callable) -> Callable:
|
|
@functools.wraps(fn)
|
|
@click.pass_obj
|
|
def wrapped(obj, *args, **kwargs):
|
|
issue = obj['issue']
|
|
return fn(issue, *args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
@issue.group()
|
|
def link():
|
|
pass
|
|
|
|
|
|
def _format_links(client, issue):
|
|
lines = []
|
|
issuelinks = getattr(issue.fields, 'issuelinks', [])
|
|
if issuelinks:
|
|
lines.append("Issue Links")
|
|
# TODO group by type?
|
|
for issuelink in issuelinks:
|
|
other_issue = getattr(issuelink, 'inwardIssue', getattr(issuelink, 'outwardIssue', None))
|
|
summary = indent('\n'.join(wrap(other_issue.fields.summary, width=72)), prefix=' ' * 4)
|
|
lines.append(f" {issuelink.type} - {other_issue.key}\n{summary}")
|
|
|
|
remote_links = client.remote_links(issue.id)
|
|
if remote_links:
|
|
if lines:
|
|
lines.append('')
|
|
lines.append("Remote Links:")
|
|
for link in remote_links:
|
|
# TODO tabulate?
|
|
lines.append(f" {link.object.title} => {link.object.url}")
|
|
|
|
return '\n'.join(lines)
|
|
|
|
|
|
@link.command('list')
|
|
@pass_issue
|
|
@pass_client
|
|
def link_list(client, issue):
|
|
print(_format_links(client, issue))
|
|
|
|
|
|
def prompt(description: str, *, multiline_ok: bool = True, initial_content: str = '', empty_ok: bool = False) -> str:
|
|
divider_line = '--- Everything after this line will be removed'
|
|
|
|
contents = f"{initial_content}\n{divider_line}\n\n{description}"
|
|
contents = editor.edit(contents=contents).decode('utf-8')
|
|
if not contents:
|
|
raise click.ClickException('Empty content. Cannot continue.')
|
|
|
|
contents_lines = contents.splitlines()
|
|
for i, line in enumerate(reversed(contents_lines), start=1):
|
|
if line == divider_line:
|
|
break
|
|
else:
|
|
raise click.ClickException('Could not find divider line. Please keep the footer intact.')
|
|
contents_lines = contents_lines[:-i]
|
|
|
|
if not multiline_ok and len(contents_lines) > 1:
|
|
# TODO build a retry into this?
|
|
raise click.ClickException('Input needs to be a single line only')
|
|
|
|
if not empty_ok and not any(contents_lines):
|
|
raise click.ClickException('Empty content. Cannot continue.')
|
|
|
|
return '\n'.join(contents_lines)
|
|
|
|
|
|
@link.command('create')
|
|
@click.argument('URL', required=False)
|
|
@click.argument('TITLE', required=False)
|
|
@pass_issue
|
|
@pass_client
|
|
def link_create(client, issue, url: str, title: str):
|
|
|
|
if url is None:
|
|
url = prompt(f"Input the URL for the link to add to {issue.key}", multiline_ok=False)
|
|
if title is None:
|
|
title = prompt(f"Input a single line of title for {url}. This will be added to {issue.key}.",
|
|
multiline_ok=False)
|
|
|
|
client.add_simple_link(issue.key, {'url': url, 'title': title})
|
|
|
|
|
|
@link.command('issue')
|
|
@pass_issue
|
|
@pass_client
|
|
def link_issue(client, issue):
|
|
"""Link to another issue"""
|
|
raise NotImplementedError
|
|
|
|
|
|
@issue.command('open')
|
|
@pass_issue
|
|
def issue_open(issue):
|
|
import subprocess
|
|
|
|
subprocess.call(['xdg-open', issue.permalink()])
|
|
|
|
|
|
@issue.group()
|
|
def label():
|
|
pass
|
|
|
|
|
|
@label.command('list')
|
|
@pass_issue
|
|
def label_list(issue):
|
|
"""List all labels on the issue"""
|
|
print('\n'.join(issue.fields.labels))
|
|
|
|
|
|
@label.command('edit')
|
|
@pass_issue
|
|
def label_edit(issue):
|
|
"""Edit the labels of the issue"""
|
|
import jira
|
|
|
|
original_labels = issue.fields.labels
|
|
# TODO add a configurable list of "often used" labels into the description
|
|
labels = set(prompt('Add or remove labels by adding them line by line or removing one',
|
|
initial_content='\n'.join(original_labels),
|
|
empty_ok=True).splitlines())
|
|
|
|
original_labels = set(original_labels)
|
|
|
|
if labels == original_labels:
|
|
print(f"Not updating {issue.key} as the labels are the same.")
|
|
return
|
|
|
|
changes = []
|
|
|
|
added_labels = labels - original_labels
|
|
for label in added_labels:
|
|
changes.append({'add': label})
|
|
|
|
removed_labels = original_labels - labels
|
|
for label in removed_labels:
|
|
changes.append({'remove': label})
|
|
|
|
# inspired by jira.Issue.add_field_value
|
|
jira.resources.Resource.update(issue, fields={"update": {"labels": changes}})
|
|
|
|
|
|
@issue.command()
|
|
@pass_issue
|
|
@pass_client
|
|
def debug(client, issue):
|
|
import IPython
|
|
IPython.embed()
|
|
|
|
|
|
@issue.command()
|
|
@pass_issue
|
|
@pass_client
|
|
def show(client, issue):
|
|
"""Show a lot of information about the issue"""
|
|
print(f"{issue.key} on {issue.fields.status} as {issue.fields.issuetype} assigned to {issue.fields.assignee}")
|
|
if getattr(issue.fields, 'parent', None):
|
|
parent = issue.fields.parent
|
|
print(f" parent: {parent.key} on {parent.fields.status} as {parent.fields.issuetype}")
|
|
summary = indent('\n'.join(wrap(parent.fields.summary, width=72)), prefix=' ' * 4)
|
|
print(f"{summary}")
|
|
# TODO handle customfields in a good manner, e.g. let the user enable
|
|
# showing them by a custom name or their own. use cache for that.
|
|
print(indent(dedent(f"""\
|
|
created: {issue.fields.created}
|
|
components: {', '.join(c.name for c in issue.fields.components)}
|
|
flagged: {bool(getattr(issue.fields, 'customfield_13216', False))}
|
|
labels: {', '.join(issue.fields.labels)}
|
|
priority: {issue.fields.priority}
|
|
reporter: {issue.fields.reporter}"""),
|
|
' ' * 2))
|
|
|
|
def _header(s):
|
|
# print(f"\n {s}\n {'+' * len(s)}")
|
|
print(f"\n +++ {s} +++")
|
|
|
|
summary = indent('\n'.join(wrap(issue.fields.summary, replace_whitespace=False)), prefix=' ' * 2)
|
|
_header('summary')
|
|
print(summary)
|
|
|
|
description = indent('\n'.join(wrap(issue.fields.description or '', replace_whitespace=False)), prefix=' ' * 2)
|
|
_header('description')
|
|
print(description)
|
|
|
|
_header('attachments')
|
|
for i, attachment in enumerate(issue.fields.attachment):
|
|
print(f" * {attachment.filename} by {attachment.author}")
|
|
print(f" {attachment.content}")
|
|
print(f" {attachment.mimeType} with {attachment.size} B")
|
|
|
|
_header('links')
|
|
links = _format_links(client, issue)
|
|
if links:
|
|
print(indent(links, prefix=' ' * 2))
|
|
|
|
_header('comments')
|
|
for i, comment in enumerate(issue.fields.comment.comments):
|
|
if i != 0:
|
|
print('')
|
|
print(f" {comment.created} - {comment.author} - {comment.id}")
|
|
body = indent('\n'.join(wrap(comment.body, replace_whitespace=False)), ' ' * 4)
|
|
print(body)
|
|
|
|
print('')
|
|
|
|
|
|
@issue.group()
|
|
def comment():
|
|
pass
|
|
|
|
|
|
@comment.command('create')
|
|
@click.option('--text', help="Text of the comment. Prompted for in editor if not provided.")
|
|
@pass_issue
|
|
@pass_client
|
|
def comment_create(client, issue, text):
|
|
"""Create a comment on the issue"""
|
|
# TODO add some help text for the syntax in the description
|
|
# TODO add all issue info to the description, so we can autocomplete in the
|
|
# editor
|
|
if not text:
|
|
text = prompt(f"Comment text to add as new comment to {issue.key}")
|
|
client.add_comment(issue.key, text)
|
|
|
|
|
|
def get_comment(ctx, param, comment_id):
|
|
"""click callback returning a Comment for the comment_id value given by the user"""
|
|
from jira.exceptions import JIRAError
|
|
|
|
issue = ctx.obj['issue']
|
|
|
|
if not comment_id:
|
|
from clintermission import cli_select_item
|
|
options = []
|
|
for comment in issue.fields.comment.comments:
|
|
# TODO make this roughly terminal width or add a shorten() method
|
|
# with … at the end
|
|
body = comment.body.replace('\n', ' ')[:64]
|
|
options.append((f"{comment.created} - {comment.author} - {body}", comment.id))
|
|
_, comment_id = cli_select_item(options)
|
|
|
|
client = ctx.obj['client']
|
|
try:
|
|
return client.comment(issue.key, comment_id, expand=['body'])
|
|
except JIRAError as e:
|
|
raise click.ClickException(f"Problem retrieving {comment_id} for issue {issue.key}: {e.text}")
|
|
|
|
|
|
@comment.command('edit')
|
|
@click.argument('comment', required=False, callback=get_comment)
|
|
@pass_issue
|
|
@pass_client
|
|
def comment_edit(client, issue, comment):
|
|
"""Edit the text of an issues comment"""
|
|
|
|
text = prompt(f"Comment text to set as new body for comment {comment.id} on {issue.key}",
|
|
initial_content=comment.body)
|
|
if text == comment.body:
|
|
print("No changes. Aborting.")
|
|
return
|
|
|
|
comment.update(body=text)
|
|
|
|
|
|
@comment.command('delete')
|
|
@click.argument('comment', required=False, callback=get_comment)
|
|
@pass_issue
|
|
@pass_client
|
|
def comment_delete(client, issue, comment):
|
|
# TODO print formatted comment for verification
|
|
click.confirm("Do you really want to delete this ^ ?", abort=True)
|
|
comment.delete()
|