1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| def get_env(): BUILD_PIPELINE_NAME = os.getenv('BUILD_PIPELINE_NAME') BUILD_PIPELINE_ID = os.getenv('BUILD_PIPELINE_ID') BUILD_NAME = os.getenv('BUILD_NAME') BUILD_TEAM_NAME = os.getenv('BUILD_TEAM_NAME') BUILD_JOB_NAME = os.getenv('BUILD_JOB_NAME') BUILD_ID = os.getenv('BUILD_ID') BUILD_TEAM_ID = os.getenv('BUILD_TEAM_ID') BUILD_JOB_ID = os.getenv('BUILD_JOB_ID') ATC_EXTERNAL_URL = os.getenv('ATC_EXTERNAL_URL') URL = '{ATC_EXTERNAL_URL}/teams/{BUILD_TEAM_NAME}/pipelines/{BUILD_PIPELINE_NAME}/jobs/{BUILD_JOB_NAME}/builds/{BUILD_NAME}'.format( ATC_EXTERNAL_URL=ATC_EXTERNAL_URL, BUILD_TEAM_NAME=BUILD_TEAM_NAME, BUILD_PIPELINE_NAME=BUILD_PIPELINE_NAME, BUILD_JOB_NAME=BUILD_JOB_NAME, BUILD_NAME=BUILD_NAME) env_dict = { 'BUILD_PIPELINE_NAME': BUILD_PIPELINE_NAME, 'BUILD_PIPELINE_ID': BUILD_PIPELINE_ID, 'BUILD_NAME': BUILD_NAME, 'BUILD_TEAM_NAME': BUILD_TEAM_NAME, 'BUILD_JOB_NAME': BUILD_JOB_NAME, 'BUILD_ID': BUILD_ID, 'BUILD_TEAM_ID': BUILD_TEAM_ID, 'BUILD_JOB_ID': BUILD_JOB_ID, 'ATC_EXTERNAL_URL': ATC_EXTERNAL_URL, 'URL': URL } return env_dict def payload_data(payload): source = payload["params"] url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send" if not source.get("url") else source.get("url") secret = source["secret"] msgtype = "markdown" if not source.get("msgtype") else source.get("msgtype") level = "success" if not source.get("level") else source.get("level") content = "No content" if not source.get("content") else source.get("content") payload_dict = {"url": url, "secret": secret, "msgtype": msgtype, "level": level, "content": content} return payload_dict
def _out(stream): payload = get_args(stream) payload_dict = payload_data(payload)
url, secret, msgtype, level, content = payload_dict.values()
data = message(msgtype, level, content) post_message(url, secret, data) timestamp = get_timestamp() return {"version": {"version": timestamp}}
|