#       Licensed to the Apache Software Foundation (ASF) under one
#       or more contributor license agreements.  See the NOTICE file
#       distributed with this work for additional information
#       regarding copyright ownership.  The ASF licenses this file
#       to you under the Apache License, Version 2.0 (the
#       "License"); you may not use this file except in compliance
#       with the License.  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#       Unless required by applicable law or agreed to in writing,
#       software distributed under the License is distributed on an
#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#       KIND, either express or implied.  See the License for the
#       specific language governing permissions and limitations
#       under the License.
import email.parser
import email.iterators
import operator
import shutil
from textwrap import dedent

import six
from base64 import b64encode
import logging
import importlib.resources

import tg
import mock
from tg import tmpl_context as c, app_globals as g
import pytest

from ming.odm import FieldProperty, Mapper
from ming.odm import ThreadLocalODMSession
from testfixtures import LogCapture

from alluratest.controller import setup_basic_test, setup_global_objects, TestController

from allura import model as M
from allura.command.taskd import TaskdCommand
from allura.lib import helpers as h
from allura.lib.mail_util import MAX_MAIL_LINE_OCTETS, email_policy
from allura.tasks import event_tasks
from allura.tasks import index_tasks
from allura.tasks import mail_tasks
from allura.tasks import notification_tasks
from allura.tasks import repo_tasks
from allura.tasks import export_tasks
from allura.tasks import admin_tasks
from allura.tests import decorators as td
from allura.tests.exclude_from_rewrite_hook import raise_compound_exception
from allura.lib.decorators import event_handler, task


class TestRepoTasks:

    def setup_method(self, method):
        setup_basic_test()
        setup_global_objects()

    @mock.patch('allura.tasks.repo_tasks.c.app')
    @mock.patch('allura.tasks.repo_tasks.g.post_event')
    def test_clone_posts_event_on_failure(self, post_event, app):
        fake_source_url = 'fake_source_url'
        fake_traceback = 'fake_traceback'
        app.repo.init_as_clone.side_effect = Exception(fake_traceback)
        repo_tasks.clone(None, None, fake_source_url)
        assert post_event.call_args[0][0] == 'repo_clone_task_failed'
        assert post_event.call_args[0][1] == fake_source_url
        assert post_event.call_args[0][2] is None
        # ignore args[3] which is a traceback string

    @mock.patch('allura.tasks.repo_tasks.session', autospec=True)
    @mock.patch.object(M, 'MergeRequest')
    def test_merge(self, MR, session):
        mr = mock.Mock(_id='_id', activity_name='merge req', activity_url='/fake/url', activity_extras={}, node_id=None,
                       app=mock.Mock(activity_name='code merge', activity_url='/fake/url', activity_extras={},
                                     node_id=None))
        MR.query.get.return_value = mr
        repo_tasks.merge(mr._id)
        mr.app.repo.merge.assert_called_once_with(mr)
        assert mr.status == 'merged'
        session.assert_called_once_with(mr)
        session.return_value.flush.assert_called_once_with(mr)

    @mock.patch.object(M, 'MergeRequest')
    def test_can_merge(self, MR):
        mr = M.MergeRequest(_id='_id')
        MR.query.get.return_value = mr
        repo_tasks.can_merge(mr._id)
        mr.app.repo.can_merge.assert_called_once_with(mr)
        val = mr.app.repo.can_merge.return_value
        mr.set_can_merge_cache.assert_called_once_with(val)


# used in test_post_event_from_within_task below
@task
def _task_that_creates_event(event_name,):
    g.post_event(event_name)
    # event does not get flushed to db right away (at end of task, ming middleware will flush it)
    assert not M.MonQTask.query.get(task_name='allura.tasks.event_tasks.event', args=[event_name])


class TestEventTasks:

    def setup_method(self, method):
        setup_basic_test()
        setup_global_objects()
        self.called_with = []

    def test_fire_event(self):
        event_tasks.event('my_event', self, 1, 2, a=5)
        assert self.called_with == [((1, 2), {'a': 5})], self.called_with

    def test_post_event_explicit_flush(self):
        g.post_event('my_event1', __task_flush_immediately=True)
        assert M.MonQTask.query.get(task_name='allura.tasks.event_tasks.event', args=['my_event1'])

        g.post_event('my_event2', __task_flush_immediately=False)
        assert not M.MonQTask.query.get(task_name='allura.tasks.event_tasks.event', args=['my_event2'])
        ThreadLocalODMSession.flush_all()
        assert M.MonQTask.query.get(task_name='allura.tasks.event_tasks.event', args=['my_event2'])

    def test_post_event_from_script(self):
        # simulate post_event being called from a paster script command:
        with mock.patch.dict(tg.request.environ, PATH_INFO='--script--'):
            g.post_event('my_event3')
            # event task is flushed to db right away:
            assert M.MonQTask.query.get(task_name='allura.tasks.event_tasks.event', args=['my_event3'])

    def test_post_event_from_within_task(self):
        # instead of M.MonQTask.run_ready() run real 'taskd' so we get all the setup we need
        taskd = TaskdCommand('taskd')
        taskd.parse_args([str(importlib.resources.files('allura') / '../test.ini')])
        taskd.keep_running = True
        taskd.restart_when_done = False
        _task_that_creates_event.post('my_event4')
        with mock.patch('allura.command.taskd.setproctitle') as setproctitle:
            def stop_taskd_after_this_task(*args):
                taskd.keep_running = False
            setproctitle.side_effect = stop_taskd_after_this_task  # avoid proc title change; useful hook to stop taskd
            taskd.worker()
        # after the initial task is done, the event task has been persisted:
        assert M.MonQTask.query.get(task_name='allura.tasks.event_tasks.event', args=['my_event4'])

    def test_compound_error(self):
        t = raise_compound_exception.post()
        with LogCapture(level=logging.ERROR) as lc, \
                mock.patch.dict(tg.config, {'monq.raise_errors': False}):  # match normal non-test behavior
            t()
        # lc.check() would be nice, but string is too detailed to check
        assert lc.records[0].name == 'allura.model.monq_model'
        msg = lc.records[0].getMessage()
        assert "AssertionError('assert 0'" in msg
        assert "AssertionError('assert 5'" in msg
        assert ' on job <MonQTask ' in msg
        assert ' (error) P:10 allura.tests.exclude_from_rewrite_hook.raise_compound_exception ' in msg
        for x in range(10):
            assert ('assert %d' % x) in t.result


class TestIndexTasks:

    def setup_method(self, method):
        setup_basic_test()
        setup_global_objects()

    def test_add_projects(self):
        g.solr.db.clear()
        old_solr_size = len(g.solr.db)
        projects = M.Project.query.find().all()
        index_tasks.add_projects.post([p._id for p in projects])
        M.MonQTask.run_ready()
        new_solr_size = len(g.solr.db)
        assert old_solr_size + len(projects) == new_solr_size

    @td.with_wiki
    def test_del_projects(self):
        projects = M.Project.query.find().all()
        index_tasks.add_projects([p._id for p in projects])

        with mock.patch('allura.tasks.index_tasks.g.solr') as solr:
            index_tasks.del_projects([p.index_id() for p in projects])
            assert solr.delete.call_count, 1
            for project in projects:
                assert project.index_id() in solr.delete.call_args[1]['q']

    @td.with_wiki
    def test_add_artifacts(self):
        from allura.lib.search import find_shortlinks
        with mock.patch('allura.lib.search.find_shortlinks') as find_slinks:
            find_slinks.side_effect = lambda s: find_shortlinks(s)

            old_shortlinks = M.Shortlink.query.find().count()
            old_solr_size = len(g.solr.db)
            artifacts = [_TestArtifact() for x in range(5)]
            for i, a in enumerate(artifacts):
                a._shorthand_id = 't%d' % i
                a.text = 'This is a reference to [t3]'
            arefs = [M.ArtifactReference.from_artifact(a) for a in artifacts]
            ref_ids = [r._id for r in arefs]
            M.artifact_orm_session.flush()
            index_tasks.add_artifacts(ref_ids)
            new_shortlinks = M.Shortlink.query.find().count()
            new_solr_size = len(g.solr.db)
            assert old_shortlinks + \
                5 == new_shortlinks, 'Shortlinks not created'
            assert old_solr_size + \
                5 == new_solr_size, "Solr additions didn't happen"
            M.main_orm_session.flush()
            M.main_orm_session.clear()
            t3 = _TestArtifact.query.get(_shorthand_id='t3')
            assert len(t3.backrefs) == 5, t3.backrefs
            assert (find_slinks.call_args_list ==
                    [mock.call(a.index().get('text')) for a in artifacts])

    @td.with_wiki
    @mock.patch('allura.tasks.index_tasks.g.solr')
    def test_del_artifacts(self, solr):
        old_shortlinks = M.Shortlink.query.find().count()
        artifacts = [_TestArtifact(_shorthand_id='ta_%s' % x)
                     for x in range(5)]
        M.artifact_orm_session.flush()
        arefs = [M.ArtifactReference.from_artifact(a) for a in artifacts]
        ref_ids = [r._id for r in arefs]
        M.artifact_orm_session.flush()
        index_tasks.add_artifacts(ref_ids)
        M.main_orm_session.flush()
        M.main_orm_session.clear()
        new_shortlinks = M.Shortlink.query.find().count()
        assert old_shortlinks + 5 == new_shortlinks, 'Shortlinks not created'
        assert solr.add.call_count == 1
        sort_key = operator.itemgetter('id')
        assert (
            sorted(solr.add.call_args[0][0], key=sort_key) ==
            sorted((ref.artifact.solarize() for ref in arefs),
                   key=sort_key))
        index_tasks.del_artifacts(ref_ids)
        M.main_orm_session.flush()
        M.main_orm_session.clear()
        new_shortlinks = M.Shortlink.query.find().count()
        assert old_shortlinks == new_shortlinks, 'Shortlinks not deleted'
        solr_query = 'id:({})'.format(' || '.join(ref_ids))
        solr.delete.assert_called_once_with(q=solr_query)


class TestMailTasks:

    def setup_method(self, method):
        setup_basic_test()
        setup_global_objects()

    # these tests go down through the mail_util.SMTPClient.sendmail method
    # since usage is generally through the task, and not using mail_util
    # directly

    def test_send_email_ascii_with_user_lookup(self):
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendmail(
                fromaddr=str(c.user._id),
                destinations=[str(c.user._id)],
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)

            assert rcpts == [c.user.get_pref('email_address')]
            assert 'Reply-To: %s' % g.noreply in body
            assert 'From: "Test Admin" <test-admin@users.localhost>' in body
            assert 'Subject: Test subject' in body
            # plain
            assert 'This is a test' in body
            # html
            assert (
                '<div class="markdown_content"><p>This is a test</p></div>' in body)

    def test_send_email_nonascii(self):
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendmail(
                fromaddr='"По" <foo@bar.com>',
                destinations=['blah@blah.com'],
                text='Громады стройные теснятся',
                reply_to=g.noreply,
                subject='По оживлённым берегам',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)

            assert rcpts == ['blah@blah.com']
            assert 'Reply-To: %s' % g.noreply in body

            # The address portion must not be encoded, only the name portion can be.
            unquoted_cyrillic_No = '=?utf-8?q?=D0=9F=D0=BE?='  # По
            quoted_cyrillic_No = '=?utf-8?b?ItCf0L4i?='  # "По"
            assert (f'From: {quoted_cyrillic_No} <foo@bar.com>' in body or
                    f'From: {unquoted_cyrillic_No} <foo@bar.com>' in body), body
            assert (
                'Subject: =?utf-8?b?0J/QviDQvtC20LjQstC70ZHQvdC90YvQvCDQsdC10YDQtdCz0LDQvA==?=' in body)
            assert 'Content-Type: text/plain; charset="utf-8"' in body
            assert 'Content-Transfer-Encoding: base64' in body
            assert six.ensure_text(b64encode('Громады стройные теснятся'.encode())) in body

    def test_send_email_with_disabled_user(self):
        c.user = M.User.by_username('test-admin')
        c.user.disabled = True
        destination_user = M.User.by_username('test-user-1')
        destination_user.preferences['email_address'] = 'user1@mail.com'
        ThreadLocalODMSession.flush_all()
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendmail(
                fromaddr=str(c.user._id),
                destinations=[str(destination_user._id)],
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)
            assert 'From: %s' % g.noreply in body

    def test_send_email_with_disabled_destination_user(self):
        c.user = M.User.by_username('test-admin')
        destination_user = M.User.by_username('test-user-1')
        destination_user.preferences['email_address'] = 'user1@mail.com'
        destination_user.disabled = True
        ThreadLocalODMSession.flush_all()
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendmail(
                fromaddr=str(c.user._id),
                destinations=[str(destination_user._id)],
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 0

    def test_sendsimplemail_with_disabled_user(self):
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendsimplemail(
                fromaddr=str(c.user._id),
                toaddr='test@mail.com',
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)
            assert 'From: "Test Admin" <test-admin@users.localhost>' in body

            c.user.disabled = True
            ThreadLocalODMSession.flush_all()
            mail_tasks.sendsimplemail(
                fromaddr=str(c.user._id),
                toaddr='test@mail.com',
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 2
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)
            assert 'From: %s' % g.noreply in body

    def test_email_sender_to_headers(self):
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendsimplemail(
                fromaddr=str(c.user._id),
                toaddr='test@mail.com',
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                sender='tickets@test.p.domain.net',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)
            assert 'From: "Test Admin" <test-admin@users.localhost>' in body
            assert 'Sender: tickets@test.p.domain.net' in body
            assert 'To: test@mail.com' in body

            _client.reset_mock()
            mail_tasks.sendmail(
                fromaddr=str(c.user._id),
                destinations=[str(c.user._id)],
                text='This is a test',
                reply_to='123@tickets.test.p.domain.net',
                subject='Test subject',
                sender='tickets@test.p.domain.net',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)
            assert 'From: "Test Admin" <test-admin@users.localhost>' in body
            assert 'Sender: tickets@test.p.domain.net' in body
            assert 'To: 123@tickets.test.p.domain.net' in body

    def test_email_references_header(self):
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendsimplemail(
                fromaddr=str(c.user._id),
                toaddr='test@mail.com',
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                references=['a', 'b', 'c'],
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)
            assert 'From: "Test Admin" <test-admin@users.localhost>' in body
            assert 'References: <a> <b> <c>' in body

            _client.reset_mock()
            mail_tasks.sendmail(
                fromaddr=str(c.user._id),
                destinations=[str(c.user._id)],
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                references='ref',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body = body.split(email_policy.linesep)
            assert 'From: "Test Admin" <test-admin@users.localhost>' in body
            assert 'References: <ref>' in body

    def test_cc(self):
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendsimplemail(
                fromaddr=str(c.user._id),
                toaddr='test@mail.com',
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                cc='someone@example.com',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            assert 'CC: someone@example.com' in body
            assert 'someone@example.com' in rcpts

    def test_fromaddr_objectid_not_str(self):
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendsimplemail(
                fromaddr=c.user._id,
                toaddr='test@mail.com',
                text='This is a test',
                reply_to=g.noreply,
                subject='Test subject',
                message_id=h.gen_message_id())
            assert _client.sendmail.call_count == 1
            return_path, rcpts, body = _client.sendmail.call_args[0]
            assert 'From: "Test Admin" <test-admin@users.localhost>' in body

    @pytest.mark.parametrize('bodychars', [
        '0123456789',  # plain ascii is handled different since it doesn't necessarily need to be encoded
        'Громады стро ',
    ])
    def test_send_email_long_lines(self, bodychars):
        with mock.patch.object(mail_tasks.smtp_client, '_client') as _client:
            mail_tasks.sendsimplemail(
                fromaddr='"По" <foo@bar.com>',
                toaddr='blah@blah.com',
                text=bodychars * 100,
                reply_to=g.noreply,
                subject='123451234512345' * 100,
                references=['foo@example.com'] * 100,  # needs to handle really long headers as well
                message_id=h.gen_message_id())
            return_path, rcpts, body = _client.sendmail.call_args[0]
            body_lines = body.split(email_policy.linesep)

            for line in body_lines:
                assert len(line) <= MAX_MAIL_LINE_OCTETS

            msg = email.parser.Parser().parsestr(body)
            plain_subpart = next(email.iterators.typed_subpart_iterator(msg, 'text', 'plain'))
            plain = plain_subpart.get_payload(decode=True).decode('utf-8')
            html_subpart = next(email.iterators.typed_subpart_iterator(msg, 'text', 'html')).get_payload(decode=True)
            html = html_subpart.decode('utf-8')

            assert (bodychars + bodychars) in plain
            assert f'<div class="markdown_content"><p>{bodychars}{bodychars}' in html

    @td.with_wiki
    def test_receive_email_ok(self):
        c.user = M.User.by_username('test-admin')
        import forgewiki
        with mock.patch.object(forgewiki.wiki_main.ForgeWikiApp, 'handle_message') as f:
            mail_tasks.route_email(
                '0.0.0.0', c.user.email_addresses[0],
                ['Page@wiki.test.p.in.localhost'],
                'This is a mail message')
            args, kwargs = f.call_args
            assert args[0] == 'Page'
            assert len(args) == 2

    @td.with_tool('test', 'Tickets', 'bugs')
    def test_receive_autoresponse(self):
        message = dedent('''\
            Date: Wed, 30 Oct 2013 01:38:40 -0700
            From: <test-admin@domain.net>
            To: <1@bugs.test.p.in.localhost>
            Message-ID: <super-unique-id>
            Subject: Not here Re: Message notification
            Precedence: bulk
            X-Autoreply: yes
            Auto-Submitted: auto-replied

            I'm not here''')
        import forgetracker
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(forgetracker.tracker_main.ForgeTrackerApp, 'handle_message') as hm:
            mail_tasks.route_email(
                '0.0.0.0',
                c.user.email_addresses[0],
                ['1@bugs.test.p.in.localhost'],
                message)
            assert hm.call_count == 0

    @td.with_tool('test', 'Tickets', 'bugs')
    def test_email_posting_disabled(self):
        message = 'Hello, world!'
        import forgetracker
        c.user = M.User.by_username('test-admin')
        with mock.patch.object(forgetracker.tracker_main.ForgeTrackerApp, 'handle_message') as hm:
            c.app.config.options = {'AllowEmailPosting': False}
            mail_tasks.route_email(
                '0.0.0.0',
                c.user.email_addresses[0],
                ['1@bugs.test.p.in.localhost'],
                message)
            assert hm.call_count == 0


class TestUserNotificationTasks(TestController):
    def setup_method(self, method):
        super().setup_method(method)
        self.setup_with_tools()

    @td.with_wiki
    def setup_with_tools(self):
        pass

    def test_send_usermentions_notification(self):
        c.user = M.User.by_username('test-admin')
        test_user = M.User.by_username('test-user-1')
        test_user.set_pref('mention_notifications', True)
        M.MonQTask.query.remove()
        d = dict(title='foo', text='Hey @test-user-1!')
        self.app.post('/wiki/foo/update', params=d)
        M.MonQTask.run_ready()
        # check email notification
        tasks = M.MonQTask.query.find(
            dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all()
        assert len(tasks) == 1
        assert (tasks[0].kwargs['subject'] ==
                '[test:wiki] Your name was mentioned')
        assert tasks[0].kwargs['toaddr'] == 'test-user-1@allura.local'
        assert tasks[0].kwargs['reply_to'] == g.noreply
        text = tasks[0].kwargs['text']
        assert 'Your name was mentioned at [foo]' in text
        assert 'by Test Admin' in text
        assert 'auth/subscriptions#notifications' in text


class TestNotificationTasks:

    def setup_method(self, method):
        setup_basic_test()
        setup_global_objects()

    def test_delivers_messages(self):
        with mock.patch.object(M.Mailbox, 'deliver') as deliver, \
                mock.patch.object(M.Mailbox, 'fire_ready') as fire_ready:
            notification_tasks.notify('42', ['52'], 'none')
            deliver.assert_called_with('42', ['52'], 'none')
            fire_ready.assert_called_with()


@event_handler('my_event')
def _my_event(event_type, testcase, *args, **kwargs):
    testcase.called_with.append((args, kwargs))


class _TestArtifact(M.Artifact):
    _shorthand_id = FieldProperty(str)
    text = FieldProperty(str)

    def url(self):
        return ''

    def shorthand_id(self):
        return getattr(self, '_shorthand_id', self._id)

    def index(self):
        return dict(
            super().index(),
            text=self.text)


class TestExportTasks:

    def setup_method(self, method):
        setup_basic_test()
        setup_global_objects()
        project = M.Project.query.get(shortname='test')
        shutil.rmtree(project.bulk_export_path(tg.config['bulk_export_path']), ignore_errors=True)

    def teardown_method(self, method):
        project = M.Project.query.get(shortname='test')
        shutil.rmtree(project.bulk_export_path(tg.config['bulk_export_path']), ignore_errors=True)

    def test_bulk_export_filter_exportable(self):
        exportable = mock.Mock(exportable=True)
        not_exportable = mock.Mock(exportable=False)
        BE = export_tasks.BulkExport()
        assert BE.filter_exportable([None, exportable, not_exportable]) == [exportable]

    def test_bulk_export_filter_successful(self):
        BE = export_tasks.BulkExport()
        assert BE.filter_successful(['foo', None, '0']) == ['foo', '0']

    @mock.patch('allura.tasks.export_tasks.shutil')
    @mock.patch('allura.tasks.export_tasks.zipdir')
    @mock.patch.dict(tg.config, {'bulk_export_filename': '{project}.zip'})
    @td.with_wiki
    def test_bulk_export(self, zipdir, shutil):
        M.MonQTask.query.remove()
        export_tasks.bulk_export(['wiki'])
        temp = '/tmp/bulk_export/p/test/test'
        zipfn = '/tmp/bulk_export/p/test/test.zip'
        zipdir.assert_called_with(temp, zipfn)
        shutil.rmtree.assert_called_once_with(six.ensure_binary(temp))
        # check notification
        tasks = M.MonQTask.query.find(
            dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all()
        assert len(tasks) == 1
        assert (tasks[0].kwargs['subject'] ==
                'Bulk export for project test completed')
        assert tasks[0].kwargs['fromaddr'] == '"Allura" <noreply@localhost>'
        assert tasks[0].kwargs['reply_to'] == g.noreply
        text = tasks[0].kwargs['text']
        assert 'The bulk export for project test is completed.' in text
        assert 'The following tools were exported:\n- wiki' in text
        assert 'Sample instructions for test' in text

    def test_bulk_export_status(self):
        assert c.project.bulk_export_status() is None
        export_tasks.bulk_export.post(['wiki'])
        assert c.project.bulk_export_status() == 'busy'


class TestAdminTasks:

    def test_install_app_docstring(self):
        assert 'ep_name, mount_point=None' in admin_tasks.install_app.__doc__


Mapper.compile_all()
